tmf/lttng: Fix newly-introduced Javadoc warnings
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f
ASL
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
5500a7f0 20import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
0283f7ff 44 *
8fd82db5
FC
45 * @version 1.0
46 * @author Francois Chouinard
8c8bf09f 47 */
6256d8ad 48public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
063f0d27 54 /** Default amount of events per request "chunk" */
00641a97 55 public static final int DEFAULT_BLOCK_SIZE = 50000;
063f0d27
AM
56
57 /** Default size of the queue */
00641a97
FC
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
948b0607 60 // ------------------------------------------------------------------------
12c155f5 61 // Attributes
948b0607 62 // ------------------------------------------------------------------------
8c8bf09f 63
6f4e8ec0 64 /** The type of event handled by this provider */
6256d8ad 65 protected Class<? extends ITmfEvent> fType;
6f4e8ec0
AM
66
67 /** Is there some data being logged? */
12c155f5 68 protected boolean fLogData;
6f4e8ec0
AM
69
70 /** Are errors being logged? */
12c155f5 71 protected boolean fLogError;
f6b14ce2 72
6f4e8ec0 73 /** Queue of events */
6256d8ad 74 protected BlockingQueue<ITmfEvent> fDataQueue;
6f4e8ec0
AM
75
76 /** Size of the fDataQueue */
77 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
78
fc7cd0be 79 private TmfRequestExecutor fExecutor;
948b0607
FC
80
81 private int fSignalDepth = 0;
045df77d 82 private final Object fLock = new Object();
951d134a 83
c1c69938
FC
84 private int fRequestPendingCounter = 0;
85
948b0607
FC
86 // ------------------------------------------------------------------------
87 // Constructors
88 // ------------------------------------------------------------------------
89
063f0d27
AM
90 /**
91 * Default constructor
92 */
12c155f5 93 public TmfDataProvider() {
00641a97 94 super();
12c155f5 95 fQueueSize = DEFAULT_QUEUE_SIZE;
6256d8ad 96 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
12c155f5
FC
97 fExecutor = new TmfRequestExecutor();
98 }
99
063f0d27
AM
100 /**
101 * Initialize this data provider
102 *
103 * @param name
104 * Name of the provider
105 * @param type
106 * The type of events that will be handled
107 */
6256d8ad 108 public void init(String name, Class<? extends ITmfEvent> type) {
12c155f5 109 super.init(name);
3791b5df 110 fType = type;
6256d8ad 111 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
12c155f5
FC
112
113 fExecutor = new TmfRequestExecutor();
114 fSignalDepth = 0;
115
5500a7f0
FC
116 fLogData = TmfCoreTracer.isEventTraced();
117// fLogError = TmfCoreTracer.isErrorTraced();
12c155f5
FC
118
119 TmfProviderManager.register(fType, this);
120 }
121
6f4e8ec0
AM
122 /**
123 * Constructor specifying the event type and the queue size.
124 *
125 * @param name
126 * Name of the provider
127 * @param type
128 * Type of event that will be handled
129 * @param queueSize
130 * Size of the event queue
131 */
6256d8ad 132 protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
00641a97 133 this();
948b0607 134 fQueueSize = queueSize;
00641a97 135 init(name, type);
948b0607
FC
136 }
137
063f0d27
AM
138 /**
139 * Copy constructor
140 *
141 * @param other
142 * The other object to copy
143 */
6256d8ad 144 public TmfDataProvider(TmfDataProvider other) {
00641a97
FC
145 this();
146 init(other.getName(), other.fType);
147 }
550d787e 148
063f0d27
AM
149 /**
150 * Standard constructor. Instantiate and initialize at the same time.
151 *
152 * @param name
153 * Name of the provider
154 * @param type
155 * The type of events that will be handled
156 */
6256d8ad 157 public TmfDataProvider(String name, Class<? extends ITmfEvent> type) {
00641a97 158 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
159 }
160
161 @Override
162 public void dispose() {
163 TmfProviderManager.deregister(fType, this);
164 fExecutor.stop();
165 super.dispose();
12c155f5 166 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
167 }
168
00641a97
FC
169 // ------------------------------------------------------------------------
170 // Accessors
171 // ------------------------------------------------------------------------
172
063f0d27
AM
173 /**
174 * Get the queue size of this provider
175 *
176 * @return The size of the queue
177 */
948b0607
FC
178 public int getQueueSize() {
179 return fQueueSize;
180 }
181
063f0d27
AM
182 /**
183 * Get the event type this provider handles
184 *
185 * @return The type of ITmfEvent
186 */
948b0607
FC
187 public Class<?> getType() {
188 return fType;
189 }
190
191 // ------------------------------------------------------------------------
192 // ITmfRequestHandler
193 // ------------------------------------------------------------------------
194
195 @Override
6256d8ad 196 public void sendRequest(final ITmfDataRequest request) {
948b0607
FC
197 synchronized (fLock) {
198 if (fSignalDepth > 0) {
199 coalesceDataRequest(request);
200 } else {
201 dispatchRequest(request);
202 }
203 }
204 }
205
948b0607 206 @Override
c1c69938 207 public void fireRequest() {
948b0607
FC
208 synchronized (fLock) {
209 if (fRequestPendingCounter > 0) {
210 return;
211 }
212 if (fPendingCoalescedRequests.size() > 0) {
6256d8ad 213 for (TmfDataRequest request : fPendingCoalescedRequests) {
948b0607
FC
214 dispatchRequest(request);
215 }
216 fPendingCoalescedRequests.clear();
217 }
218 }
219 }
220
221 /**
063f0d27 222 * Increments/decrements the pending requests counters and fires the request
0283f7ff
FC
223 * if necessary (counter == 0). Used for coalescing requests across multiple
224 * TmfDataProvider's.
063f0d27 225 *
948b0607 226 * @param isIncrement
0283f7ff
FC
227 * Should we increment (true) or decrement (false) the pending
228 * counter
948b0607 229 */
c1c69938
FC
230 @Override
231 public void notifyPendingRequest(boolean isIncrement) {
948b0607 232 synchronized (fLock) {
c1c69938
FC
233 if (isIncrement) {
234 if (fSignalDepth > 0) {
235 fRequestPendingCounter++;
236 }
237 } else {
238 if (fRequestPendingCounter > 0) {
239 fRequestPendingCounter--;
240 }
948b0607 241
c1c69938
FC
242 // fire request if all pending requests are received
243 if (fRequestPendingCounter == 0) {
244 fireRequest();
245 }
246 }
247 }
948b0607
FC
248 }
249
250 // ------------------------------------------------------------------------
251 // Coalescing (primitive test...)
252 // ------------------------------------------------------------------------
253
6f4e8ec0 254 /** List of coalesced requests */
6256d8ad 255 protected Vector<TmfCoalescedDataRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest>();
948b0607 256
6f4e8ec0
AM
257 /**
258 * Create a new request from an existing one, and add it to the coalesced
259 * requests
260 *
261 * @param request
262 * The request to copy
263 */
6256d8ad 264 protected void newCoalescedDataRequest(ITmfDataRequest request) {
948b0607 265 synchronized (fLock) {
6256d8ad 266 TmfCoalescedDataRequest coalescedRequest = new TmfCoalescedDataRequest(request.getDataType(), request.getIndex(),
12c155f5 267 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607 268 coalescedRequest.addRequest(request);
5500a7f0
FC
269 if (TmfCoreTracer.isRequestTraced()) {
270 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
271 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
272 }
273 fPendingCoalescedRequests.add(coalescedRequest);
274 }
275 }
276
6f4e8ec0
AM
277 /**
278 * Add an existing requests to the list of coalesced ones
279 *
280 * @param request
281 * The request to add to the list
282 */
6256d8ad 283 protected void coalesceDataRequest(ITmfDataRequest request) {
948b0607 284 synchronized (fLock) {
6256d8ad 285 for (TmfCoalescedDataRequest coalescedRequest : fPendingCoalescedRequests) {
948b0607
FC
286 if (coalescedRequest.isCompatible(request)) {
287 coalescedRequest.addRequest(request);
5500a7f0
FC
288 if (TmfCoreTracer.isRequestTraced()) {
289 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
290 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
291 }
292 return;
293 }
294 }
295 newCoalescedDataRequest(request);
296 }
297 }
298
299 // ------------------------------------------------------------------------
300 // Request processing
301 // ------------------------------------------------------------------------
302
6256d8ad 303 private void dispatchRequest(final ITmfDataRequest request) {
0283f7ff 304 if (request.getExecType() == ExecutionType.FOREGROUND) {
948b0607 305 queueRequest(request);
0283f7ff 306 } else {
948b0607 307 queueBackgroundRequest(request, request.getBlockSize(), true);
0283f7ff 308 }
948b0607
FC
309 }
310
6f4e8ec0
AM
311 /**
312 * Queue a request.
313 *
314 * @param request
315 * The data request
316 */
6256d8ad 317 protected void queueRequest(final ITmfDataRequest request) {
948b0607
FC
318
319 if (fExecutor.isShutdown()) {
320 request.cancel();
321 return;
322 }
323
6256d8ad 324 final TmfDataProvider provider = this;
948b0607
FC
325
326 // Process the request
327 TmfThread thread = new TmfThread(request.getExecType()) {
0283f7ff 328
948b0607
FC
329 @Override
330 public void run() {
331
5500a7f0
FC
332 if (TmfCoreTracer.isRequestTraced()) {
333 TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 334 }
948b0607
FC
335
336 // Extract the generic information
337 request.start();
338 int nbRequested = request.getNbRequested();
339 int nbRead = 0;
340
341 // Initialize the execution
342 ITmfContext context = armRequest(request);
343 if (context == null) {
344 request.cancel();
345 return;
346 }
347
348 try {
349 // Get the ordered events
6256d8ad 350 ITmfEvent data = getNext(context);
5500a7f0
FC
351 if (TmfCoreTracer.isRequestTraced()) {
352 TmfCoreTracer.traceRequest(request, "read first event"); //$NON-NLS-1$
0283f7ff 353 }
948b0607 354 while (data != null && !isCompleted(request, data, nbRead)) {
0283f7ff 355 if (fLogData) {
5500a7f0 356 TmfCoreTracer.traceEvent(provider, request, data);
408e65d2 357 }
1b70b6dc
PT
358 if (request.getDataType().isInstance(data)) {
359 request.handleData(data);
360 }
948b0607
FC
361
362 // To avoid an unnecessary read passed the last data
363 // requested
364 if (++nbRead < nbRequested) {
365 data = getNext(context);
366 }
367 }
5500a7f0
FC
368 if (TmfCoreTracer.isRequestTraced()) {
369 TmfCoreTracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
0283f7ff 370 }
948b0607
FC
371
372 if (request.isCancelled()) {
373 request.cancel();
374 } else {
375 request.done();
376 }
377 } catch (Exception e) {
378 request.fail();
379 }
380
381 // Cleanup
382 context.dispose();
383 }
475743b7
FC
384
385 @Override
386 public void cancel() {
8a0edc79
FC
387 if (!request.isCompleted()) {
388 request.cancel();
475743b7
FC
389 }
390 }
948b0607
FC
391 };
392
5500a7f0
FC
393 if (TmfCoreTracer.isRequestTraced()) {
394 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
0283f7ff 395 }
948b0607 396 fExecutor.execute(thread);
948b0607
FC
397 }
398
6f4e8ec0
AM
399 /**
400 * Queue a background request
401 *
402 * @param request
403 * The request
404 * @param blockSize
405 * The request should be split in chunks of this size
406 * @param indexing
407 * Should we index the chunks
408 */
409 protected void queueBackgroundRequest(final ITmfDataRequest request,
410 final int blockSize, final boolean indexing) {
948b0607 411
6256d8ad 412 final TmfDataProvider provider = this;
4cf201de 413
948b0607
FC
414 Thread thread = new Thread() {
415 @Override
416 public void run() {
4cf201de 417
5500a7f0
FC
418 if (TmfCoreTracer.isRequestTraced()) {
419 TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de
FC
420 }
421
948b0607
FC
422 request.start();
423
424 final Integer[] CHUNK_SIZE = new Integer[1];
425 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
426
427 final Integer[] nbRead = new Integer[1];
428 nbRead[0] = 0;
429
430 final Boolean[] isFinished = new Boolean[1];
431 isFinished[0] = Boolean.FALSE;
432
433 while (!isFinished[0]) {
434
6256d8ad 435 TmfDataRequest subRequest = new TmfDataRequest(request.getDataType(), request.getIndex()
12c155f5 436 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
11a2fdf0
PT
437
438 @Override
439 public synchronized boolean isCompleted() {
440 return super.isCompleted() || request.isCompleted();
441 }
442
948b0607 443 @Override
6256d8ad 444 public void handleData(ITmfEvent data) {
948b0607 445 super.handleData(data);
1b70b6dc
PT
446 if (request.getDataType().isInstance(data)) {
447 request.handleData(data);
448 }
948b0607
FC
449 if (getNbRead() > CHUNK_SIZE[0]) {
450 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
451 }
452 }
453
454 @Override
455 public void handleCompleted() {
456 nbRead[0] += getNbRead();
457 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 458 if (this.isCancelled()) {
948b0607 459 request.cancel();
90de83da 460 } else if (this.isFailed()) {
12c155f5 461 request.fail();
948b0607
FC
462 } else {
463 request.done();
464 }
465 isFinished[0] = Boolean.TRUE;
466 }
467 super.handleCompleted();
468 }
469 };
470
471 if (!isFinished[0]) {
472 queueRequest(subRequest);
473
474 try {
475 subRequest.waitForCompletion();
11a2fdf0
PT
476 if (request.isCompleted()) {
477 isFinished[0] = Boolean.TRUE;
478 }
948b0607
FC
479 } catch (InterruptedException e) {
480 e.printStackTrace();
481 }
482
483 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
484 }
485 }
486 }
487 };
488
489 thread.start();
490 }
491
492 /**
d337369a
FC
493 * Initialize the provider based on the request. The context is provider
494 * specific and will be updated by getNext().
0283f7ff 495 *
948b0607 496 * @param request
6f4e8ec0
AM
497 * The request
498 * @return Sn application specific context; null if request can't be
499 * serviced
948b0607 500 */
6256d8ad 501 protected abstract ITmfContext armRequest(ITmfDataRequest request);
948b0607 502
c32744d6
FC
503// /**
504// * Return the next event based on the context supplied. The context
505// * will be updated for the subsequent read.
0283f7ff 506// *
c32744d6
FC
507// * @param context the trace read context (updated)
508// * @return the event referred to by context
509// */
510// public abstract T getNext(ITmfContext context);
948b0607
FC
511
512 /**
513 * Checks if the data meets the request completion criteria.
0283f7ff 514 *
0d9a6d76
FC
515 * @param request the request
516 * @param data the data to verify
517 * @param nbRead the number of events read so far
518 * @return true if completion criteria is met
948b0607 519 */
6256d8ad 520 public boolean isCompleted(ITmfDataRequest request, ITmfEvent data, int nbRead) {
b6be1c3e 521 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
522 }
523
fc7cd0be
AM
524 // ------------------------------------------------------------------------
525 // Pass-through's to the request executor
526 // ------------------------------------------------------------------------
527
528 /**
529 * @return the shutdown state (i.e. if it is accepting new requests)
530 * @since 2.0
531 */
532 protected boolean executorIsShutdown() {
533 return fExecutor.isShutdown();
534 }
535
536 /**
537 * @return the termination state
538 * @since 2.0
539 */
540 protected boolean executorIsTerminated() {
541 return fExecutor.isTerminated();
542 }
543
948b0607
FC
544 // ------------------------------------------------------------------------
545 // Signal handlers
546 // ------------------------------------------------------------------------
547
063f0d27
AM
548 /**
549 * Handler for the start synch signal
550 *
551 * @param signal
552 * Incoming signal
553 */
948b0607
FC
554 @TmfSignalHandler
555 public void startSynch(TmfStartSynchSignal signal) {
556 synchronized (fLock) {
557 fSignalDepth++;
558 }
559 }
560
063f0d27
AM
561 /**
562 * Handler for the end synch signal
563 *
564 * @param signal
565 * Incoming signal
566 */
948b0607
FC
567 @TmfSignalHandler
568 public void endSynch(TmfEndSynchSignal signal) {
045df77d 569 synchronized (fLock) {
948b0607
FC
570 fSignalDepth--;
571 if (fSignalDepth == 0) {
572 fireRequest();
573 }
045df77d 574 }
948b0607 575 }
8c8bf09f
ASL
576
577}
This page took 0.071084 seconds and 5 git commands to generate.