1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.core
.component
;
15 import java
.util
.Vector
;
16 import java
.util
.concurrent
.BlockingQueue
;
17 import java
.util
.concurrent
.LinkedBlockingQueue
;
18 import java
.util
.concurrent
.SynchronousQueue
;
20 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.TmfCoreTracer
;
21 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfProviderManager
;
22 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfThread
;
23 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfCoalescedDataRequest
;
24 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
25 import org
.eclipse
.linuxtools
.tmf
.core
.event
.ITmfEvent
;
26 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfDataRequest
;
27 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfDataRequest
.ExecutionType
;
28 import org
.eclipse
.linuxtools
.tmf
.core
.request
.TmfDataRequest
;
29 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfEndSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfSignalHandler
;
31 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfStartSynchSignal
;
32 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfContext
;
35 * An abstract base class that implements ITmfDataProvider.
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
43 * TODO: Add support for providing multiple data types.
46 * @author Francois Chouinard
48 public abstract class TmfDataProvider
extends TmfComponent
implements ITmfDataProvider
{
50 // ------------------------------------------------------------------------
52 // ------------------------------------------------------------------------
54 /** Default amount of events per request "chunk" */
55 public static final int DEFAULT_BLOCK_SIZE
= 50000;
57 /** Default size of the queue */
58 public static final int DEFAULT_QUEUE_SIZE
= 1000;
60 // ------------------------------------------------------------------------
62 // ------------------------------------------------------------------------
64 /** The type of event handled by this provider */
65 protected Class
<?
extends ITmfEvent
> fType
;
67 /** Is there some data being logged? */
68 protected boolean fLogData
;
70 /** Are errors being logged? */
71 protected boolean fLogError
;
73 /** Queue of events */
74 protected BlockingQueue
<ITmfEvent
> fDataQueue
;
76 /** Size of the fDataQueue */
77 protected int fQueueSize
= DEFAULT_QUEUE_SIZE
;
79 private TmfRequestExecutor fExecutor
;
81 private int fSignalDepth
= 0;
82 private final Object fLock
= new Object();
84 private int fRequestPendingCounter
= 0;
86 // ------------------------------------------------------------------------
88 // ------------------------------------------------------------------------
93 public TmfDataProvider() {
95 fQueueSize
= DEFAULT_QUEUE_SIZE
;
96 fDataQueue
= new LinkedBlockingQueue
<ITmfEvent
>(fQueueSize
);
97 fExecutor
= new TmfRequestExecutor();
101 * Initialize this data provider
104 * Name of the provider
106 * The type of events that will be handled
108 public void init(String name
, Class
<?
extends ITmfEvent
> type
) {
111 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<ITmfEvent
>(fQueueSize
) : new SynchronousQueue
<ITmfEvent
>();
113 fExecutor
= new TmfRequestExecutor();
116 fLogData
= TmfCoreTracer
.isEventTraced();
117 // fLogError = TmfCoreTracer.isErrorTraced();
119 TmfProviderManager
.register(fType
, this);
123 * Constructor specifying the event type and the queue size.
126 * Name of the provider
128 * Type of event that will be handled
130 * Size of the event queue
132 protected TmfDataProvider(String name
, Class
<?
extends ITmfEvent
> type
, int queueSize
) {
134 fQueueSize
= queueSize
;
142 * The other object to copy
144 public TmfDataProvider(TmfDataProvider other
) {
146 init(other
.getName(), other
.fType
);
150 * Standard constructor. Instantiate and initialize at the same time.
153 * Name of the provider
155 * The type of events that will be handled
157 public TmfDataProvider(String name
, Class
<?
extends ITmfEvent
> type
) {
158 this(name
, type
, DEFAULT_QUEUE_SIZE
);
162 public void dispose() {
163 TmfProviderManager
.deregister(fType
, this);
166 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
169 // ------------------------------------------------------------------------
171 // ------------------------------------------------------------------------
174 * Get the queue size of this provider
176 * @return The size of the queue
178 public int getQueueSize() {
183 * Get the event type this provider handles
185 * @return The type of ITmfEvent
187 public Class
<?
> getType() {
191 // ------------------------------------------------------------------------
192 // ITmfRequestHandler
193 // ------------------------------------------------------------------------
196 public void sendRequest(final ITmfDataRequest request
) {
197 synchronized (fLock
) {
198 if (fSignalDepth
> 0) {
199 coalesceDataRequest(request
);
201 dispatchRequest(request
);
207 public void fireRequest() {
208 synchronized (fLock
) {
209 if (fRequestPendingCounter
> 0) {
212 if (fPendingCoalescedRequests
.size() > 0) {
213 for (TmfDataRequest request
: fPendingCoalescedRequests
) {
214 dispatchRequest(request
);
216 fPendingCoalescedRequests
.clear();
222 * Increments/decrements the pending requests counters and fires the request
223 * if necessary (counter == 0). Used for coalescing requests across multiple
227 * Should we increment (true) or decrement (false) the pending
231 public void notifyPendingRequest(boolean isIncrement
) {
232 synchronized (fLock
) {
234 if (fSignalDepth
> 0) {
235 fRequestPendingCounter
++;
238 if (fRequestPendingCounter
> 0) {
239 fRequestPendingCounter
--;
242 // fire request if all pending requests are received
243 if (fRequestPendingCounter
== 0) {
250 // ------------------------------------------------------------------------
251 // Coalescing (primitive test...)
252 // ------------------------------------------------------------------------
254 /** List of coalesced requests */
255 protected Vector
<TmfCoalescedDataRequest
> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
>();
258 * Create a new request from an existing one, and add it to the coalesced
262 * The request to copy
264 protected void newCoalescedDataRequest(ITmfDataRequest request
) {
265 synchronized (fLock
) {
266 TmfCoalescedDataRequest coalescedRequest
= new TmfCoalescedDataRequest(request
.getDataType(), request
.getIndex(),
267 request
.getNbRequested(), request
.getBlockSize(), request
.getExecType());
268 coalescedRequest
.addRequest(request
);
269 if (TmfCoreTracer
.isRequestTraced()) {
270 TmfCoreTracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
271 TmfCoreTracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
273 fPendingCoalescedRequests
.add(coalescedRequest
);
278 * Add an existing requests to the list of coalesced ones
281 * The request to add to the list
283 protected void coalesceDataRequest(ITmfDataRequest request
) {
284 synchronized (fLock
) {
285 for (TmfCoalescedDataRequest coalescedRequest
: fPendingCoalescedRequests
) {
286 if (coalescedRequest
.isCompatible(request
)) {
287 coalescedRequest
.addRequest(request
);
288 if (TmfCoreTracer
.isRequestTraced()) {
289 TmfCoreTracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
290 TmfCoreTracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
295 newCoalescedDataRequest(request
);
299 // ------------------------------------------------------------------------
300 // Request processing
301 // ------------------------------------------------------------------------
303 private void dispatchRequest(final ITmfDataRequest request
) {
304 if (request
.getExecType() == ExecutionType
.FOREGROUND
) {
305 queueRequest(request
);
307 queueBackgroundRequest(request
, request
.getBlockSize(), true);
317 protected void queueRequest(final ITmfDataRequest request
) {
319 if (fExecutor
.isShutdown()) {
324 final TmfDataProvider provider
= this;
326 // Process the request
327 TmfThread thread
= new TmfThread(request
.getExecType()) {
332 if (TmfCoreTracer
.isRequestTraced()) {
333 TmfCoreTracer
.traceRequest(request
, "is being serviced by " + provider
.getName()); //$NON-NLS-1$
336 // Extract the generic information
338 int nbRequested
= request
.getNbRequested();
341 // Initialize the execution
342 ITmfContext context
= armRequest(request
);
343 if (context
== null) {
349 // Get the ordered events
350 ITmfEvent data
= getNext(context
);
351 if (TmfCoreTracer
.isRequestTraced()) {
352 TmfCoreTracer
.traceRequest(request
, "read first event"); //$NON-NLS-1$
354 while (data
!= null && !isCompleted(request
, data
, nbRead
)) {
356 TmfCoreTracer
.traceEvent(provider
, request
, data
);
358 if (request
.getDataType().isInstance(data
)) {
359 request
.handleData(data
);
362 // To avoid an unnecessary read passed the last data
364 if (++nbRead
< nbRequested
) {
365 data
= getNext(context
);
368 if (TmfCoreTracer
.isRequestTraced()) {
369 TmfCoreTracer
.traceRequest(request
, "COMPLETED"); //$NON-NLS-1$
372 if (request
.isCancelled()) {
377 } catch (Exception e
) {
386 public void cancel() {
387 if (!request
.isCompleted()) {
393 if (TmfCoreTracer
.isRequestTraced()) {
394 TmfCoreTracer
.traceRequest(request
, "QUEUED"); //$NON-NLS-1$
396 fExecutor
.execute(thread
);
400 * Queue a background request
405 * The request should be split in chunks of this size
407 * Should we index the chunks
409 protected void queueBackgroundRequest(final ITmfDataRequest request
,
410 final int blockSize
, final boolean indexing
) {
412 final TmfDataProvider provider
= this;
414 Thread thread
= new Thread() {
418 if (TmfCoreTracer
.isRequestTraced()) {
419 TmfCoreTracer
.traceRequest(request
, "is being serviced by " + provider
.getName()); //$NON-NLS-1$
424 final Integer
[] CHUNK_SIZE
= new Integer
[1];
425 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested(), blockSize
+ ((indexing
) ?
1 : 0));
427 final Integer
[] nbRead
= new Integer
[1];
430 final Boolean
[] isFinished
= new Boolean
[1];
431 isFinished
[0] = Boolean
.FALSE
;
433 while (!isFinished
[0]) {
435 TmfDataRequest subRequest
= new TmfDataRequest(request
.getDataType(), request
.getIndex()
436 + nbRead
[0], CHUNK_SIZE
[0], blockSize
, ExecutionType
.BACKGROUND
) {
439 public synchronized boolean isCompleted() {
440 return super.isCompleted() || request
.isCompleted();
444 public void handleData(ITmfEvent data
) {
445 super.handleData(data
);
446 if (request
.getDataType().isInstance(data
)) {
447 request
.handleData(data
);
449 if (getNbRead() > CHUNK_SIZE
[0]) {
450 System
.out
.println("ERROR - Read too many events"); //$NON-NLS-1$
455 public void handleCompleted() {
456 nbRead
[0] += getNbRead();
457 if (nbRead
[0] >= request
.getNbRequested() || (getNbRead() < CHUNK_SIZE
[0])) {
458 if (this.isCancelled()) {
460 } else if (this.isFailed()) {
465 isFinished
[0] = Boolean
.TRUE
;
467 super.handleCompleted();
471 if (!isFinished
[0]) {
472 queueRequest(subRequest
);
475 subRequest
.waitForCompletion();
476 if (request
.isCompleted()) {
477 isFinished
[0] = Boolean
.TRUE
;
479 } catch (InterruptedException e
) {
483 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested() - nbRead
[0], blockSize
);
493 * Initialize the provider based on the request. The context is provider
494 * specific and will be updated by getNext().
498 * @return Sn application specific context; null if request can't be
501 protected abstract ITmfContext
armRequest(ITmfDataRequest request
);
504 // * Return the next event based on the context supplied. The context
505 // * will be updated for the subsequent read.
507 // * @param context the trace read context (updated)
508 // * @return the event referred to by context
510 // public abstract T getNext(ITmfContext context);
513 * Checks if the data meets the request completion criteria.
515 * @param request the request
516 * @param data the data to verify
517 * @param nbRead the number of events read so far
518 * @return true if completion criteria is met
520 public boolean isCompleted(ITmfDataRequest request
, ITmfEvent data
, int nbRead
) {
521 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
524 // ------------------------------------------------------------------------
525 // Pass-through's to the request executor
526 // ------------------------------------------------------------------------
529 * @return the shutdown state (i.e. if it is accepting new requests)
532 protected boolean executorIsShutdown() {
533 return fExecutor
.isShutdown();
537 * @return the termination state
540 protected boolean executorIsTerminated() {
541 return fExecutor
.isTerminated();
544 // ------------------------------------------------------------------------
546 // ------------------------------------------------------------------------
549 * Handler for the start synch signal
555 public void startSynch(TmfStartSynchSignal signal
) {
556 synchronized (fLock
) {
562 * Handler for the end synch signal
568 public void endSynch(TmfEndSynchSignal signal
) {
569 synchronized (fLock
) {
571 if (fSignalDepth
== 0) {