1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.core
.component
;
15 import java
.util
.Vector
;
16 import java
.util
.concurrent
.BlockingQueue
;
17 import java
.util
.concurrent
.LinkedBlockingQueue
;
18 import java
.util
.concurrent
.SynchronousQueue
;
20 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.Tracer
;
21 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfProviderManager
;
22 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfThread
;
23 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfCoalescedDataRequest
;
24 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
25 import org
.eclipse
.linuxtools
.tmf
.core
.event
.ITmfEvent
;
26 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfDataRequest
;
27 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfDataRequest
.ExecutionType
;
28 import org
.eclipse
.linuxtools
.tmf
.core
.request
.TmfDataRequest
;
29 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfEndSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfSignalHandler
;
31 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfStartSynchSignal
;
32 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfContext
;
35 * An abstract base class that implements ITmfDataProvider.
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
43 * TODO: Add support for providing multiple data types.
46 * @author Francois Chouinard
48 public abstract class TmfDataProvider
<T
extends ITmfEvent
> extends TmfComponent
implements ITmfDataProvider
<T
> {
50 // ------------------------------------------------------------------------
52 // ------------------------------------------------------------------------
54 public static final int DEFAULT_BLOCK_SIZE
= 50000;
55 public static final int DEFAULT_QUEUE_SIZE
= 1000;
57 // ------------------------------------------------------------------------
59 // ------------------------------------------------------------------------
61 protected Class
<T
> fType
;
62 protected boolean fLogData
;
63 protected boolean fLogError
;
65 protected int fQueueSize
= DEFAULT_QUEUE_SIZE
;
66 protected BlockingQueue
<T
> fDataQueue
;
67 protected TmfRequestExecutor fExecutor
;
69 private int fSignalDepth
= 0;
70 private final Object fLock
= new Object();
72 private int fRequestPendingCounter
= 0;
74 // ------------------------------------------------------------------------
76 // ------------------------------------------------------------------------
78 public TmfDataProvider() {
80 fQueueSize
= DEFAULT_QUEUE_SIZE
;
81 fDataQueue
= new LinkedBlockingQueue
<T
>(fQueueSize
);
82 fExecutor
= new TmfRequestExecutor();
85 public void init(String name
, Class
<T
> type
) {
88 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
90 fExecutor
= new TmfRequestExecutor();
93 fLogData
= Tracer
.isEventTraced();
94 fLogError
= Tracer
.isErrorTraced();
96 TmfProviderManager
.register(fType
, this);
99 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
101 fQueueSize
= queueSize
;
105 public TmfDataProvider(TmfDataProvider
<T
> other
) {
107 init(other
.getName(), other
.fType
);
110 public TmfDataProvider(String name
, Class
<T
> type
) {
111 this(name
, type
, DEFAULT_QUEUE_SIZE
);
115 public void dispose() {
116 TmfProviderManager
.deregister(fType
, this);
119 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
122 // ------------------------------------------------------------------------
124 // ------------------------------------------------------------------------
126 public int getQueueSize() {
130 public Class
<?
> getType() {
134 // ------------------------------------------------------------------------
135 // ITmfRequestHandler
136 // ------------------------------------------------------------------------
139 public void sendRequest(final ITmfDataRequest
<T
> request
) {
140 synchronized (fLock
) {
141 if (fSignalDepth
> 0) {
142 coalesceDataRequest(request
);
144 dispatchRequest(request
);
150 * This method queues the coalesced requests.
153 public void fireRequest() {
154 synchronized (fLock
) {
155 if (fRequestPendingCounter
> 0) {
158 if (fPendingCoalescedRequests
.size() > 0) {
159 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
160 dispatchRequest(request
);
162 fPendingCoalescedRequests
.clear();
168 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
169 * coalescing requests accross multiple TmfDataProvider.
174 public void notifyPendingRequest(boolean isIncrement
) {
175 synchronized (fLock
) {
177 if (fSignalDepth
> 0) {
178 fRequestPendingCounter
++;
181 if (fRequestPendingCounter
> 0) {
182 fRequestPendingCounter
--;
185 // fire request if all pending requests are received
186 if (fRequestPendingCounter
== 0) {
193 // ------------------------------------------------------------------------
194 // Coalescing (primitive test...)
195 // ------------------------------------------------------------------------
197 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
199 protected void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
200 synchronized (fLock
) {
201 TmfCoalescedDataRequest
<T
> coalescedRequest
= new TmfCoalescedDataRequest
<T
>(request
.getDataType(), request
.getIndex(),
202 request
.getNbRequested(), request
.getBlockSize(), request
.getExecType());
203 coalescedRequest
.addRequest(request
);
204 if (Tracer
.isRequestTraced()) {
205 Tracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
206 Tracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
208 fPendingCoalescedRequests
.add(coalescedRequest
);
212 protected void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
213 synchronized (fLock
) {
214 for (TmfCoalescedDataRequest
<T
> coalescedRequest
: fPendingCoalescedRequests
) {
215 if (coalescedRequest
.isCompatible(request
)) {
216 coalescedRequest
.addRequest(request
);
217 if (Tracer
.isRequestTraced()) {
218 Tracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
219 Tracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
224 newCoalescedDataRequest(request
);
228 // ------------------------------------------------------------------------
229 // Request processing
230 // ------------------------------------------------------------------------
232 private void dispatchRequest(final ITmfDataRequest
<T
> request
) {
233 if (request
.getExecType() == ExecutionType
.FOREGROUND
)
234 queueRequest(request
);
236 queueBackgroundRequest(request
, request
.getBlockSize(), true);
239 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
241 if (fExecutor
.isShutdown()) {
246 final TmfDataProvider
<T
> provider
= this;
248 // Process the request
249 TmfThread thread
= new TmfThread(request
.getExecType()) {
254 if (Tracer
.isRequestTraced()) {
255 Tracer
.traceRequest(request
, "is being serviced by " + provider
.getName()); //$NON-NLS-1$
258 // Extract the generic information
260 int nbRequested
= request
.getNbRequested();
263 // Initialize the execution
264 ITmfContext context
= armRequest(request
);
265 if (context
== null) {
271 // Get the ordered events
272 T data
= getNext(context
);
273 if (Tracer
.isRequestTraced())
274 Tracer
.traceRequest(request
, "read first event"); //$NON-NLS-1$
275 while (data
!= null && !isCompleted(request
, data
, nbRead
)) {
277 // Tracer.traceEvent(provider, request, data);
278 if (request
.getDataType().isInstance(data
)) {
279 request
.handleData(data
);
282 // To avoid an unnecessary read passed the last data
284 if (++nbRead
< nbRequested
) {
285 data
= getNext(context
);
288 if (Tracer
.isRequestTraced())
289 Tracer
.traceRequest(request
, "COMPLETED"); //$NON-NLS-1$
291 if (request
.isCancelled()) {
296 } catch (Exception e
) {
305 public void cancel() {
306 if (!request
.isCompleted()) {
312 if (Tracer
.isRequestTraced())
313 Tracer
.traceRequest(request
, "QUEUED"); //$NON-NLS-1$
314 fExecutor
.execute(thread
);
318 protected void queueBackgroundRequest(final ITmfDataRequest
<T
> request
, final int blockSize
, final boolean indexing
) {
320 final TmfDataProvider
<T
> provider
= this;
322 Thread thread
= new Thread() {
326 if (Tracer
.isRequestTraced()) {
327 Tracer
.traceRequest(request
, "is being serviced by " + provider
.getName()); //$NON-NLS-1$
332 final Integer
[] CHUNK_SIZE
= new Integer
[1];
333 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested(), blockSize
+ ((indexing
) ?
1 : 0));
335 final Integer
[] nbRead
= new Integer
[1];
338 final Boolean
[] isFinished
= new Boolean
[1];
339 isFinished
[0] = Boolean
.FALSE
;
341 while (!isFinished
[0]) {
343 TmfDataRequest
<T
> subRequest
= new TmfDataRequest
<T
>(request
.getDataType(), request
.getIndex()
344 + nbRead
[0], CHUNK_SIZE
[0], blockSize
, ExecutionType
.BACKGROUND
) {
346 public void handleData(T data
) {
347 super.handleData(data
);
348 if (request
.getDataType().isInstance(data
)) {
349 request
.handleData(data
);
351 if (getNbRead() > CHUNK_SIZE
[0]) {
352 System
.out
.println("ERROR - Read too many events"); //$NON-NLS-1$
357 public void handleCompleted() {
358 nbRead
[0] += getNbRead();
359 if (nbRead
[0] >= request
.getNbRequested() || (getNbRead() < CHUNK_SIZE
[0])) {
360 if (this.isCancelled()) {
362 } else if (this.isFailed()) {
367 isFinished
[0] = Boolean
.TRUE
;
369 super.handleCompleted();
373 if (!isFinished
[0]) {
374 queueRequest(subRequest
);
377 subRequest
.waitForCompletion();
378 } catch (InterruptedException e
) {
382 CHUNK_SIZE
[0] = Math
.min(request
.getNbRequested() - nbRead
[0], blockSize
);
392 * Initialize the provider based on the request. The context is provider
393 * specific and will be updated by getNext().
396 * @return an application specific context; null if request can't be serviced
398 protected abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
401 // * Return the next event based on the context supplied. The context
402 // * will be updated for the subsequent read.
404 // * @param context the trace read context (updated)
405 // * @return the event referred to by context
407 // public abstract T getNext(ITmfContext context);
410 * Checks if the data meets the request completion criteria.
412 * @param request the request
413 * @param data the data to verify
414 * @param nbRead the number of events read so far
415 * @return true if completion criteria is met
417 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
418 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
421 // ------------------------------------------------------------------------
423 // ------------------------------------------------------------------------
426 public void startSynch(TmfStartSynchSignal signal
) {
427 synchronized (fLock
) {
433 public void endSynch(TmfEndSynchSignal signal
) {
434 synchronized (fLock
) {
436 if (fSignalDepth
== 0) {