tmf: Move TmfEventRequest.ALL_DATA to the interface
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfEventProvider.java
CommitLineData
8c8bf09f 1/*******************************************************************************
61759503 2 * Copyright (c) 2009, 2013 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f 9 * Contributors:
fd3f1eff
AM
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
8c8bf09f
ASL
13 *******************************************************************************/
14
6c13869b 15package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 16
fd3f1eff
AM
17import java.util.ArrayList;
18import java.util.List;
19import java.util.concurrent.BlockingQueue;
20import java.util.concurrent.LinkedBlockingQueue;
21import java.util.concurrent.SynchronousQueue;
22
5419a136 23import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
fd3f1eff
AM
24import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
25import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
5419a136 26import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest;
fd3f1eff 27import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 28import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
5419a136 29import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
fd3f1eff
AM
30import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest.ExecutionType;
31import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
32import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
33import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
3bd46eef 34import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp;
fd3f1eff 35import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
36
37/**
fd3f1eff
AM
38 * An abstract base class that implements ITmfEventProvider.
39 * <p>
40 * This abstract class implements the housekeeping methods to register/
41 * de-register the event provider and to handle generically the event requests.
42 * </p>
0283f7ff 43 *
8fd82db5 44 * @author Francois Chouinard
8c8bf09f 45 */
fd3f1eff
AM
46public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
47
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
52 /** Default amount of events per request "chunk" */
53 public static final int DEFAULT_BLOCK_SIZE = 50000;
54
55 /** Default size of the queue */
56 public static final int DEFAULT_QUEUE_SIZE = 1000;
57
58 // ------------------------------------------------------------------------
59 // Attributes
60 // ------------------------------------------------------------------------
61
62 /** List of coalesced requests */
63 protected final List<TmfCoalescedEventRequest> fPendingCoalescedRequests =
64 new ArrayList<TmfCoalescedEventRequest>();
65
66 /** The type of event handled by this provider */
67 protected Class<? extends ITmfEvent> fType;
68
69 /** Queue of events */
70 protected BlockingQueue<ITmfEvent> fDataQueue;
71
72 /** Size of the fDataQueue */
73 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
74
75 private final TmfRequestExecutor fExecutor;
76
77 private final Object fLock = new Object();
78
79 private int fSignalDepth = 0;
80
81 private int fRequestPendingCounter = 0;
8c8bf09f 82
00641a97
FC
83 // ------------------------------------------------------------------------
84 // Constructors
85 // ------------------------------------------------------------------------
86
063f0d27
AM
87 /**
88 * Default constructor
89 */
12c155f5 90 public TmfEventProvider() {
00641a97 91 super();
fd3f1eff
AM
92 fQueueSize = DEFAULT_QUEUE_SIZE;
93 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
94 fExecutor = new TmfRequestExecutor();
12c155f5 95 }
8c8bf09f 96
063f0d27 97 /**
fd3f1eff 98 * Initialize this data provider
063f0d27
AM
99 *
100 * @param name
fd3f1eff 101 * Name of the provider
063f0d27 102 * @param type
fd3f1eff 103 * The type of events that will be handled
063f0d27 104 */
fd3f1eff
AM
105 public void init(String name, Class<? extends ITmfEvent> type) {
106 super.init(name);
107 fType = type;
108 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
109
110 fExecutor.init();
111 fSignalDepth = 0;
112
113 TmfProviderManager.register(fType, this);
12c155f5
FC
114 }
115
063f0d27 116 /**
fd3f1eff 117 * Constructor specifying the event type and the queue size.
063f0d27
AM
118 *
119 * @param name
fd3f1eff 120 * Name of the provider
063f0d27 121 * @param type
fd3f1eff 122 * Type of event that will be handled
063f0d27 123 * @param queueSize
fd3f1eff 124 * Size of the event queue
063f0d27 125 */
fd3f1eff
AM
126 protected TmfEventProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
127 this();
128 fQueueSize = queueSize;
129 init(name, type);
12c155f5
FC
130 }
131
063f0d27
AM
132 /**
133 * Copy constructor
134 *
135 * @param other
fd3f1eff 136 * The other object to copy
063f0d27 137 */
6256d8ad 138 public TmfEventProvider(TmfEventProvider other) {
fd3f1eff
AM
139 this();
140 init(other.getName(), other.fType);
141 }
142
143 /**
144 * Standard constructor. Instantiate and initialize at the same time.
145 *
146 * @param name
147 * Name of the provider
148 * @param type
149 * The type of events that will be handled
150 */
151 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
152 this(name, type, DEFAULT_QUEUE_SIZE);
153 }
154
155 @Override
156 public void dispose() {
157 TmfProviderManager.deregister(fType, this);
158 fExecutor.stop();
159 super.dispose();
12c155f5
FC
160 }
161
00641a97 162 // ------------------------------------------------------------------------
fd3f1eff
AM
163 // Accessors
164 // ------------------------------------------------------------------------
165
166 /**
167 * Get the queue size of this provider
168 *
169 * @return The size of the queue
170 */
171 public int getQueueSize() {
172 return fQueueSize;
173 }
174
175 /**
176 * Get the event type this provider handles
177 *
178 * @return The type of ITmfEvent
179 */
180 public Class<?> getType() {
181 return fType;
182 }
183
184 // ------------------------------------------------------------------------
185 // ITmfRequestHandler
00641a97
FC
186 // ------------------------------------------------------------------------
187
5419a136 188 @Override
fd3f1eff
AM
189 public void sendRequest(final ITmfEventRequest request) {
190 synchronized (fLock) {
191 if (fSignalDepth > 0) {
192 coalesceEventRequest(request);
193 } else {
194 dispatchRequest(request);
195 }
196 }
197 }
198
199 @Override
200 public void fireRequest() {
201 synchronized (fLock) {
202 if (fRequestPendingCounter > 0) {
203 return;
204 }
205 if (fPendingCoalescedRequests.size() > 0) {
206 for (ITmfEventRequest request : fPendingCoalescedRequests) {
207 dispatchRequest(request);
208 }
209 fPendingCoalescedRequests.clear();
210 }
5419a136 211 }
5419a136
AM
212 }
213
fd3f1eff
AM
214 /**
215 * Increments/decrements the pending requests counters and fires the request
216 * if necessary (counter == 0). Used for coalescing requests across multiple
217 * TmfDataProvider's.
218 *
219 * @param isIncrement
220 * Should we increment (true) or decrement (false) the pending
221 * counter
222 */
5419a136 223 @Override
fd3f1eff
AM
224 public void notifyPendingRequest(boolean isIncrement) {
225 synchronized (fLock) {
226 if (isIncrement) {
227 if (fSignalDepth > 0) {
228 fRequestPendingCounter++;
229 }
230 } else {
231 if (fRequestPendingCounter > 0) {
232 fRequestPendingCounter--;
233 }
234
235 // fire request if all pending requests are received
236 if (fRequestPendingCounter == 0) {
237 fireRequest();
238 }
239 }
240 }
241 }
242
243 // ------------------------------------------------------------------------
244 // Coalescing
245 // ------------------------------------------------------------------------
246
247 /**
248 * Create a new request from an existing one, and add it to the coalesced
249 * requests
250 *
251 * @param request
252 * The request to copy
253 */
254 protected synchronized void newCoalescedEventRequest(ITmfEventRequest request) {
672a642a 255 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
fd3f1eff
AM
256 request.getDataType(),
257 request.getRange(),
258 request.getIndex(),
259 request.getNbRequested(),
260 request.getExecType());
261 coalescedRequest.addRequest(request);
5419a136
AM
262 if (TmfCoreTracer.isRequestTraced()) {
263 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
264 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
265 }
266 fPendingCoalescedRequests.add(coalescedRequest);
fd3f1eff
AM
267 }
268
269 /**
270 * Add an existing requests to the list of coalesced ones
271 *
272 * @param request
273 * The request to add to the list
274 */
275 protected void coalesceEventRequest(ITmfEventRequest request) {
276 synchronized (fLock) {
277 for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) {
278 if (coalescedRequest.isCompatible(request)) {
279 coalescedRequest.addRequest(request);
280 if (TmfCoreTracer.isRequestTraced()) {
281 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
282 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
283 }
284 return;
285 }
286 }
287 newCoalescedEventRequest(request);
288 }
289 }
290
291 // ------------------------------------------------------------------------
292 // Request processing
293 // ------------------------------------------------------------------------
294
295 private void dispatchRequest(final ITmfEventRequest request) {
296 if (request.getExecType() == ExecutionType.FOREGROUND) {
297 queueRequest(request);
5419a136 298 } else {
fd3f1eff
AM
299 queueBackgroundRequest(request, true);
300 }
301 }
302
303 /**
304 * Queue a request.
305 *
306 * @param request
307 * The data request
308 */
309 protected void queueRequest(final ITmfEventRequest request) {
310
311 if (fExecutor.isShutdown()) {
312 request.cancel();
313 return;
314 }
315
316 TmfEventThread thread = new TmfEventThread(this, request);
317
318 if (TmfCoreTracer.isRequestTraced()) {
319 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
320 }
321
322 fExecutor.execute(thread);
323 }
324
325 /**
326 * Queue a background request
327 *
328 * @param request
329 * The request
330 * @param indexing
331 * Should we index the chunks
332 * @since 3.0
333 */
334 protected void queueBackgroundRequest(final ITmfEventRequest request, final boolean indexing) {
335 queueRequest(request);
336 }
337
338 /**
339 * Initialize the provider based on the request. The context is provider
340 * specific and will be updated by getNext().
341 *
342 * @param request
343 * The request
344 * @return An application specific context; null if request can't be
345 * serviced
346 * @since 2.0
347 */
348 public abstract ITmfContext armRequest(ITmfEventRequest request);
349
350 /**
351 * Checks if the data meets the request completion criteria.
352 *
353 * @param request
354 * The request
355 * @param event
356 * The data to verify
357 * @param nbRead
358 * The number of events read so far
359 * @return true if completion criteria is met
360 */
361 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
362 boolean requestCompleted = isCompleted2(request, nbRead);
363 if (!requestCompleted) {
364 ITmfTimestamp endTime = request.getRange().getEndTime();
365 return event.getTimestamp().compareTo(endTime, false) > 0;
366 }
367 return requestCompleted;
368 }
369
370 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
371 return request.isCompleted() || nbRead >= request.getNbRequested();
372 }
373
374 // ------------------------------------------------------------------------
375 // Pass-through's to the request executor
376 // ------------------------------------------------------------------------
377
378 /**
379 * @return the shutdown state (i.e. if it is accepting new requests)
380 * @since 2.0
381 */
382 protected boolean executorIsShutdown() {
383 return fExecutor.isShutdown();
384 }
385
386 /**
387 * @return the termination state
388 * @since 2.0
389 */
390 protected boolean executorIsTerminated() {
391 return fExecutor.isTerminated();
392 }
393
394 // ------------------------------------------------------------------------
395 // Signal handlers
396 // ------------------------------------------------------------------------
397
398 /**
399 * Handler for the start synch signal
400 *
401 * @param signal
402 * Incoming signal
403 */
404 @TmfSignalHandler
405 public void startSynch(TmfStartSynchSignal signal) {
406 synchronized (fLock) {
407 fSignalDepth++;
408 }
409 }
410
411 /**
412 * Handler for the end synch signal
413 *
414 * @param signal
415 * Incoming signal
416 */
417 @TmfSignalHandler
418 public void endSynch(TmfEndSynchSignal signal) {
419 synchronized (fLock) {
420 fSignalDepth--;
421 if (fSignalDepth == 0) {
422 fireRequest();
423 }
5419a136
AM
424 }
425 }
951d134a 426
8c8bf09f 427}
This page took 0.063921 seconds and 5 git commands to generate.