1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.lang
.reflect
.Array
;
16 import java
.util
.Vector
;
17 import java
.util
.concurrent
.BlockingQueue
;
18 import java
.util
.concurrent
.LinkedBlockingQueue
;
19 import java
.util
.concurrent
.SynchronousQueue
;
21 import org
.eclipse
.linuxtools
.tmf
.Tracer
;
22 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
.ExecutionType
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
27 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
30 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
31 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
34 * <b><u>TmfProvider</u></b>
36 * The TmfProvider<T> is a provider for a data of type <T>.
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
44 * TODO: Add support for providing multiple data types.
46 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
48 // ------------------------------------------------------------------------
50 // ------------------------------------------------------------------------
52 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
53 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
55 // ------------------------------------------------------------------------
57 // ------------------------------------------------------------------------
59 final protected Class
<T
> fType
;
60 final protected boolean fLogData
;
61 final protected boolean fLogError
;
63 public static final int DEFAULT_BLOCK_SIZE
= 5000;
64 public static final int DEFAULT_QUEUE_SIZE
= 1000;
66 protected final int fQueueSize
;
67 protected final BlockingQueue
<T
> fDataQueue
;
68 protected final TmfRequestExecutor fExecutor
;
70 private int fSignalDepth
= 0;
71 private final Object fLock
= new Object();
73 // ------------------------------------------------------------------------
75 // ------------------------------------------------------------------------
77 public TmfDataProvider(String name
, Class
<T
> type
) {
78 this(name
, type
, DEFAULT_QUEUE_SIZE
);
81 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
84 fQueueSize
= queueSize
;
85 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
87 fExecutor
= new TmfRequestExecutor();
90 fLogData
= Tracer
.isEventTraced();
91 fLogError
= Tracer
.isErrorTraced();
93 TmfProviderManager
.register(fType
, this);
94 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
97 public TmfDataProvider(TmfDataProvider
<T
> other
) {
100 fQueueSize
= other
.fQueueSize
;
101 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
103 fExecutor
= new TmfRequestExecutor();
106 fLogData
= Tracer
.isEventTraced();
107 fLogError
= Tracer
.isErrorTraced();
111 public void dispose() {
112 TmfProviderManager
.deregister(fType
, this);
115 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
118 public int getQueueSize() {
122 public Class
<?
> getType() {
126 // ------------------------------------------------------------------------
127 // ITmfRequestHandler
128 // ------------------------------------------------------------------------
130 public void sendRequest(final ITmfDataRequest
<T
> request
) {
131 synchronized(fLock
) {
132 if (fSignalDepth
> 0) {
133 coalesceDataRequest(request
);
135 dispatchRequest(request
);
141 * This method queues the coalesced requests.
145 public void fireRequests() {
146 synchronized(fLock
) {
147 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
148 dispatchRequest(request
);
150 fPendingCoalescedRequests
.clear();
154 // ------------------------------------------------------------------------
155 // Coalescing (primitive test...)
156 // ------------------------------------------------------------------------
158 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
160 protected void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
161 synchronized(fLock
) {
162 TmfCoalescedDataRequest
<T
> coalescedRequest
=
163 new TmfCoalescedDataRequest
<T
>(fType
, request
.getIndex(), request
.getNbRequested(), request
.getBlockize(), request
.getExecType());
164 coalescedRequest
.addRequest(request
);
165 if (Tracer
.isRequestTraced()) {
166 Tracer
.traceRequest(request
, "coalesced with " + coalescedRequest
.getRequestId());
167 Tracer
.traceRequest(coalescedRequest
, "added " + request
.getRequestId());
169 fPendingCoalescedRequests
.add(coalescedRequest
);
173 protected void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
174 synchronized(fLock
) {
175 for (TmfCoalescedDataRequest
<T
> req
: fPendingCoalescedRequests
) {
176 if (req
.isCompatible(request
)) {
177 req
.addRequest(request
);
178 if (Tracer
.isRequestTraced()) {
179 Tracer
.traceRequest(request
, "coalesced with " + req
.getRequestId());
180 Tracer
.traceRequest(req
, "added " + request
.getRequestId());
185 newCoalescedDataRequest(request
);
189 // ------------------------------------------------------------------------
190 // Request processing
191 // ------------------------------------------------------------------------
193 private void dispatchRequest(final ITmfDataRequest
<T
> request
) {
194 if (request
.getExecType() == ExecutionType
.FOREGROUND
)
195 queueRequest(request
);
197 queueBackgroundRequest(request
, DEFAULT_BLOCK_SIZE
, true);
200 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
202 if (fExecutor
.isShutdown()) {
207 // final TmfDataProvider<T> provider = this;
209 // Process the request
210 TmfThread thread
= new TmfThread(request
.getExecType()) {
215 // if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
217 // Extract the generic information
219 int blockSize
= request
.getBlockize();
220 int nbRequested
= request
.getNbRequested();
222 // Create the result buffer
223 Vector
<T
> result
= new Vector
<T
>();
226 // Initialize the execution
227 ITmfContext context
= armRequest(request
);
228 if (context
== null) {
234 // Get the ordered events
235 // if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
236 T data
= getNext(context
);
237 // if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
238 while (data
!= null && !isCompleted(request
, data
, nbRead
))
240 // if (fLogData) Tracer.traceEvent(provider, request, data);
242 if (++nbRead
% blockSize
== 0) {
243 pushData(request
, result
);
245 // To avoid an unnecessary read passed the last data requested
246 if (nbRead
< nbRequested
) {
247 data
= getNext(context
);
248 if (Tracer
.isRequestTraced() && (data
== null || data
.isNullRef())) {
249 Tracer
.trace("Request #" + request
.getRequestId() + " end of data");
253 if (result
.size() > 0) {
254 pushData(request
, result
);
258 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "completed");
260 catch (Exception e
) {
261 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "exception (failed)");
263 // e.printStackTrace();
268 fExecutor
.execute(thread
);
270 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "queued");
273 // By default, same behavior as a foreground request
274 protected void queueBackgroundRequest(final ITmfDataRequest
<T
> request
, final int blockSize
, boolean indexing
) {
275 queueRequest(request
);
279 * Format the result data and forwards it to the requester.
280 * Note: after handling, the data is *removed*.
285 @SuppressWarnings("unchecked")
286 protected void pushData(ITmfDataRequest
<T
> request
, Vector
<T
> data
) {
287 synchronized(request
) {
288 if (!request
.isCompleted()) {
289 T
[] result
= (T
[]) Array
.newInstance(fType
, data
.size());
290 data
.toArray(result
);
291 request
.setData(result
);
292 request
.handleData();
293 data
.removeAllElements();
299 * Initialize the provider based on the request. The context is
300 * provider specific and will be updated by getNext().
303 * @return an application specific context; null if request can't be serviced
305 public abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
306 public abstract T
getNext(ITmfContext context
);
308 // public abstract void queueResult(T data);
311 * Return the next piece of data based on the context supplied. The context
312 * would typically be updated for the subsequent read.
317 // private static final int TIMEOUT = 10000;
318 //// public abstract T getNext(ITmfContext context) throws InterruptedException;
319 //// private int getLevel = 0;
320 // public T getNext(ITmfContext context) throws InterruptedException {
321 //// String name = Thread.currentThread().getName(); getLevel++;
322 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
323 // T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
324 // if (data == null) {
325 //// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
326 // throw new InterruptedException();
328 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
334 // * Makes the generated result data available for getNext()
338 //// public abstract void queueResult(T data) throws InterruptedException;
339 //// private int putLevel = 0;
340 // public void queueResult(T data) throws InterruptedException {
341 //// String name = Thread.currentThread().getName(); putLevel++;
342 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
343 // boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
345 //// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
346 // throw new InterruptedException();
348 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
353 * Checks if the data meets the request completion criteria.
359 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
360 return request
.isCompleted() || nbRead
>= request
.getNbRequested() || data
.isNullRef();
363 // ------------------------------------------------------------------------
365 // ------------------------------------------------------------------------
368 public void startSynch(TmfStartSynchSignal signal
) {
369 synchronized (fLock
) {
375 public void endSynch(TmfEndSynchSignal signal
) {
376 synchronized (fLock
) {
378 if (fSignalDepth
== 0) {