1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.util
.Vector
;
16 import java
.util
.concurrent
.BlockingQueue
;
17 import java
.util
.concurrent
.LinkedBlockingQueue
;
18 import java
.util
.concurrent
.SynchronousQueue
;
20 import org
.eclipse
.linuxtools
.tmf
.Tracer
;
21 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
22 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
.ExecutionType
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
33 * <b><u>TmfProvider</u></b>
35 * The TmfProvider<T> is a provider for a data of type <T>.
37 * This abstract class implements the housekeeking methods to register/ deregister the event provider and to handle
38 * generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or just implement the hooks (initializeContext()
43 * TODO: Add support for providing multiple data types.
45 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
47 // ------------------------------------------------------------------------
49 // ------------------------------------------------------------------------
51 // ------------------------------------------------------------------------
53 // ------------------------------------------------------------------------
55 protected Class
<T
> fType
;
56 protected boolean fLogData
;
57 protected boolean fLogError
;
59 public static final int DEFAULT_BLOCK_SIZE
= 50000;
60 public static final int DEFAULT_QUEUE_SIZE
= 1000;
62 protected int fQueueSize
;
63 protected BlockingQueue
<T
> fDataQueue
;
64 protected TmfRequestExecutor fExecutor
;
66 private int fSignalDepth
= 0;
67 private final Object fLock
= new Object();
69 private int fRequestPendingCounter
= 0;
71 // ------------------------------------------------------------------------
73 // ------------------------------------------------------------------------
75 public TmfDataProvider() {
76 fQueueSize
= DEFAULT_QUEUE_SIZE
;
77 fDataQueue
= new LinkedBlockingQueue
<T
>(fQueueSize
);
78 fExecutor
= new TmfRequestExecutor();
81 public void init(String name
, Class
<T
> dataType
) {
84 fQueueSize
= DEFAULT_QUEUE_SIZE
;
85 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
87 fExecutor
= new TmfRequestExecutor();
90 fLogData
= Tracer
.isEventTraced();
91 fLogError
= Tracer
.isErrorTraced();
93 TmfProviderManager
.register(fType
, this);
96 public TmfDataProvider(String name
, Class
<T
> type
) {
97 this(name
, type
, DEFAULT_QUEUE_SIZE
);
100 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
103 fQueueSize
= queueSize
;
104 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
106 fExecutor
= new TmfRequestExecutor();
109 fLogData
= Tracer
.isEventTraced();
110 fLogError
= Tracer
.isErrorTraced();
112 TmfProviderManager
.register(fType
, this);
113 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
116 public TmfDataProvider(TmfDataProvider
<T
> other
) {
119 fQueueSize
= other
.fQueueSize
;
120 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
122 fExecutor
= new TmfRequestExecutor();
125 fLogData
= Tracer
.isEventTraced();
126 fLogError
= Tracer
.isErrorTraced();
130 public void dispose() {
131 TmfProviderManager
.deregister(fType
, this);
134 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
137 public int getQueueSize() {
141 public Class
<?
> getType() {
145 // ------------------------------------------------------------------------
146 // ITmfRequestHandler
147 // ------------------------------------------------------------------------
150 public void sendRequest(final ITmfDataRequest
<T
> request
) {
151 synchronized (fLock
) {
152 if (fSignalDepth
> 0) {
153 coalesceDataRequest(request
);
155 dispatchRequest(request
);
161 * This method queues the coalesced requests.
166 public void fireRequest() {
167 synchronized (fLock
) {
168 if (fRequestPendingCounter
> 0) {
171 if (fPendingCoalescedRequests
.size() > 0) {
172 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
173 dispatchRequest(request
);
175 fPendingCoalescedRequests
.clear();
181 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
182 * coalescing requests accross multiple TmfDataProvider.
187 public void notifyPendingRequest(boolean isIncrement
) {
188 synchronized (fLock
) {
190 if (fSignalDepth
> 0) {
191 fRequestPendingCounter
++;
194 if (fRequestPendingCounter
> 0) {
195 fRequestPendingCounter
--;
198 // fire request if all pending requests are received
199 if (fRequestPendingCounter
== 0) {
206 // ------------------------------------------------------------------------
207 // Coalescing (primitive test...)
208 // ------------------------------------------------------------------------
210 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
212 protected void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
213 synchronized (fLock
) {
214 TmfCoalescedDataRequest
<T
> coalescedRequest
= new TmfCoalescedDataRequest
<T
>(fType
, request
.getIndex(),
215 request
.getNbRequested(), request
.getBlockSize(), request
.getExecType());
216 coalescedRequest
.addRequest(request
);
217 if (Tracer
.isRequestTraced()) {
218 Tracer
.traceRequest(request
, "coalesced with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
220 fPendingCoalescedRequests
.add(coalescedRequest
);
224 protected void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
225 synchronized (fLock
) {
226 for (TmfCoalescedDataRequest
<T
> coalescedRequest
: fPendingCoalescedRequests
) {
227 if (coalescedRequest
.isCompatible(request
)) {
228 coalescedRequest
.addRequest(request
);
229 if (Tracer
.isRequestTraced()) {
230 Tracer
.traceRequest(request
, "coalesced with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
235 newCoalescedDataRequest(request
);
239 // ------------------------------------------------------------------------
240 // Request processing
241 // ------------------------------------------------------------------------
243 private void dispatchRequest(final ITmfDataRequest
<T
> request
) {
244 if (request
.getExecType() == ExecutionType
.FOREGROUND
)
245 queueRequest(request
);
247 queueBackgroundRequest(request
, request
.getBlockSize(), true);
250 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
252 if (fExecutor
.isShutdown()) {
257 final TmfDataProvider
<T
> provider
= this;
259 // Process the request
260 TmfThread thread
= new TmfThread(request
.getExecType()) {
265 if (Tracer
.isRequestTraced())
266 Tracer
.trace("Request #" + request
.getRequestId() + " is being serviced by " + provider
.getName()); //$NON-NLS-1$//$NON-NLS-2$
268 // Extract the generic information
270 int nbRequested
= request
.getNbRequested();
273 // Initialize the execution
274 ITmfContext context
= armRequest(request
);
275 if (context
== null) {
281 // Get the ordered events
282 T data
= getNext(context
);
283 if (Tracer
.isRequestTraced())
284 Tracer
.trace("Request #" + request
.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
285 while (data
!= null && !isCompleted(request
, data
, nbRead
)) {
287 Tracer
.traceEvent(provider
, request
, data
);
288 request
.handleData(data
);
290 // To avoid an unnecessary read passed the last data
292 if (++nbRead
< nbRequested
) {
293 data
= getNext(context
);
296 if (Tracer
.isRequestTraced())
297 Tracer
.trace("Request #" + request
.getRequestId() + " finished"); //$NON-NLS-1$//$NON-NLS-2$
299 if (request
.isCancelled()) {
304 } catch (Exception e
) {
313 if (Tracer
.isRequestTraced())
314 Tracer
.traceRequest(request
, "queued"); //$NON-NLS-1$
315 fExecutor
.execute(thread
);
319 protected void queueBackgroundRequest(final ITmfDataRequest
<T
> request
, final int blockSize
, final boolean indexing
) {
321 Thread thread
= new Thread() {
326 final Integer
[] CHUNK_SIZE
= new Integer
[1];
327 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested(), blockSize
+ ((indexing
) ?
1 : 0));
329 final Integer
[] nbRead
= new Integer
[1];
332 final Boolean
[] isFinished
= new Boolean
[1];
333 isFinished
[0] = Boolean
.FALSE
;
335 while (!isFinished
[0]) {
337 TmfDataRequest
<T
> subRequest
= new TmfDataRequest
<T
>(request
.getDataType(), request
.getIndex()
338 + nbRead
[0], CHUNK_SIZE
[0], blockSize
, ExecutionType
.BACKGROUND
) {
340 public void handleData(T data
) {
341 super.handleData(data
);
342 request
.handleData(data
);
343 if (getNbRead() > CHUNK_SIZE
[0]) {
344 System
.out
.println("ERROR - Read too many events"); //$NON-NLS-1$
349 public void handleCompleted() {
350 nbRead
[0] += getNbRead();
351 if (nbRead
[0] >= request
.getNbRequested() || (getNbRead() < CHUNK_SIZE
[0])) {
352 if (this.isCancelled()) {
354 } else if (this.isFailed()) {
359 isFinished
[0] = Boolean
.TRUE
;
361 super.handleCompleted();
365 if (!isFinished
[0]) {
366 queueRequest(subRequest
);
369 subRequest
.waitForCompletion();
370 } catch (InterruptedException e
) {
374 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested() - nbRead
[0], blockSize
);
384 * Initialize the provider based on the request. The context is provider specific and will be updated by getNext().
387 * @return an application specific context; null if request can't be serviced
389 public abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
391 public abstract T
getNext(ITmfContext context
);
394 * Checks if the data meets the request completion criteria.
400 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
401 return request
.isCompleted() || nbRead
>= request
.getNbRequested() || data
.isNullRef();
404 // ------------------------------------------------------------------------
406 // ------------------------------------------------------------------------
409 public void startSynch(TmfStartSynchSignal signal
) {
410 synchronized (fLock
) {
416 public void endSynch(TmfEndSynchSignal signal
) {
417 synchronized (fLock
) {
419 if (fSignalDepth
== 0) {