1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.lang
.reflect
.Array
;
16 import java
.util
.Vector
;
17 import java
.util
.concurrent
.BlockingQueue
;
18 import java
.util
.concurrent
.LinkedBlockingQueue
;
19 import java
.util
.concurrent
.TimeUnit
;
21 import org
.eclipse
.linuxtools
.tmf
.Tracer
;
22 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
33 * <b><u>TmfProvider</u></b>
35 * The TmfProvider<T> is a provider for a data of type <T>.
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
43 * TODO: Add support for providing multiple data types.
45 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
47 // ------------------------------------------------------------------------
49 // ------------------------------------------------------------------------
51 private static final ITmfDataRequest
.ExecutionType SHORT
= ITmfDataRequest
.ExecutionType
.SHORT
;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54 // ------------------------------------------------------------------------
56 // ------------------------------------------------------------------------
58 final protected Class
<T
> fType
;
59 final protected boolean fLogData
;
60 final protected boolean fLogError
;
62 public static final int DEFAULT_QUEUE_SIZE
= 1000;
63 protected final int fQueueSize
;
64 protected final BlockingQueue
<T
> fDataQueue
;
65 protected final TmfRequestExecutor fExecutor
;
67 private int fSignalDepth
= 0;
69 // ------------------------------------------------------------------------
71 // ------------------------------------------------------------------------
73 public TmfDataProvider(String name
, Class
<T
> type
) {
74 this(name
, type
, DEFAULT_QUEUE_SIZE
);
77 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
80 fQueueSize
= queueSize
;
81 // fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
82 fDataQueue
= new LinkedBlockingQueue
<T
>(fQueueSize
);
84 fExecutor
= new TmfRequestExecutor();
87 fLogData
= Tracer
.isEventTraced();
88 fLogError
= Tracer
.isErrorTraced();
90 TmfProviderManager
.register(fType
, this);
91 if (Tracer
.isComponentTraced()) Tracer
.traceComponent(this, "started");
94 public TmfDataProvider(TmfDataProvider
<T
> other
) {
97 fQueueSize
= other
.fQueueSize
;
98 // fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
99 fDataQueue
= new LinkedBlockingQueue
<T
>(fQueueSize
);
101 fExecutor
= new TmfRequestExecutor();
104 fLogData
= Tracer
.isEventTraced();
105 fLogError
= Tracer
.isErrorTraced();
109 public void dispose() {
110 TmfProviderManager
.deregister(fType
, this);
113 if (Tracer
.isComponentTraced()) Tracer
.traceComponent(this, "stopped");
115 if (fClone
!= null) fClone
.dispose();
119 public int getQueueSize() {
123 public Class
<?
> getType() {
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
131 protected TmfDataProvider
<T
> fClone
;
132 public void sendRequest(final ITmfDataRequest
<T
> request
) {
134 if (fClone
== null || request
.getExecType() == SHORT
) {
135 if (fSignalDepth
> 0) {
136 coalesceDataRequest(request
);
138 queueRequest(request
);
142 fClone
.sendRequest(request
);
148 * This method queues the coalesced requests.
152 public void fireRequests() {
154 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
155 queueRequest(request
);
157 fPendingCoalescedRequests
.clear();
160 fClone
.fireRequests();
164 // ------------------------------------------------------------------------
165 // Coalescing (primitive test...)
166 // ------------------------------------------------------------------------
168 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
170 protected void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
172 TmfCoalescedDataRequest
<T
> coalescedRequest
=
173 new TmfCoalescedDataRequest
<T
>(fType
, request
.getIndex(), request
.getNbRequested(), request
.getBlockize(), request
.getExecType());
174 coalescedRequest
.addRequest(request
);
175 fPendingCoalescedRequests
.add(coalescedRequest
);
179 protected synchronized void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
181 for (TmfCoalescedDataRequest
<T
> req
: fPendingCoalescedRequests
) {
182 if (req
.isCompatible(request
)) {
183 req
.addRequest(request
);
187 newCoalescedDataRequest(request
);
191 // ------------------------------------------------------------------------
192 // Request processing
193 // ------------------------------------------------------------------------
195 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
197 final ITmfDataProvider
<T
> provider
= this;
198 final ITmfComponent component
= this;
200 // Process the request
201 Thread thread
= new Thread() {
206 // Extract the generic information
208 int blockSize
= request
.getBlockize();
209 int nbRequested
= request
.getNbRequested();
211 // Create the result buffer
212 Vector
<T
> result
= new Vector
<T
>();
215 // Initialize the execution
216 ITmfContext context
= armRequest(request
);
217 if (context
== null) {
223 // Get the ordered events
224 if (Tracer
.isRequestTraced()) Tracer
.trace("Request #" + request
.getRequestId() + " is being serviced by " + component
.getName());
225 T data
= getNext(context
);
226 if (Tracer
.isRequestTraced()) Tracer
.trace("Request #" + request
.getRequestId() + " read first event");
227 while (data
!= null && !isCompleted(request
, data
, nbRead
))
229 if (fLogData
) Tracer
.traceEvent(provider
, request
, data
);
231 if (++nbRead
% blockSize
== 0) {
232 pushData(request
, result
);
234 // To avoid an unnecessary read passed the last data requested
235 if (nbRead
< nbRequested
) {
236 data
= getNext(context
);
237 if (data
== null || data
.isNullRef()) {
238 if (Tracer
.isRequestTraced()) Tracer
.trace("Request #" + request
.getRequestId() + " end of data");
242 if (result
.size() > 0) {
243 pushData(request
, result
);
247 catch (Exception e
) {
249 // e.printStackTrace();
253 fExecutor
.execute(thread
);
254 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "queued");
258 * Format the result data and forwards it to the requester.
259 * Note: after handling, the data is *removed*.
264 @SuppressWarnings("unchecked")
265 protected void pushData(ITmfDataRequest
<T
> request
, Vector
<T
> data
) {
266 synchronized(request
) {
267 if (!request
.isCompleted()) {
268 T
[] result
= (T
[]) Array
.newInstance(fType
, data
.size());
269 data
.toArray(result
);
270 request
.setData(result
);
271 request
.handleData();
272 data
.removeAllElements();
278 * Initialize the provider based on the request. The context is
279 * provider specific and will be updated by getNext().
282 * @return an application specific context; null if request can't be serviced
284 public abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
287 * Return the next piece of data based on the context supplied. The context
288 * would typically be updated for the subsequent read.
293 private static final int TIMEOUT
= 1000;
294 // public abstract T getNext(ITmfContext context) throws InterruptedException;
295 // private int getLevel = 0;
296 public T
getNext(ITmfContext context
) throws InterruptedException
{
297 // String name = Thread.currentThread().getName(); getLevel++;
298 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
299 T data
= fDataQueue
.poll(TIMEOUT
, TimeUnit
.MILLISECONDS
);
301 if (Tracer
.isErrorTraced()) Tracer
.traceError(getName() + ": Request timeout on read");
302 throw new InterruptedException();
304 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
310 * Makes the generated result data available for getNext()
314 // public abstract void queueResult(T data) throws InterruptedException;
315 // private int putLevel = 0;
316 public void queueResult(T data
) throws InterruptedException
{
317 // String name = Thread.currentThread().getName(); putLevel++;
318 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
319 boolean ok
= fDataQueue
.offer(data
, TIMEOUT
, TimeUnit
.MILLISECONDS
);
321 if (Tracer
.isErrorTraced()) Tracer
.traceError(getName() + ": Request timeout on write");
322 throw new InterruptedException();
324 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
329 * Checks if the data meets the request completion criteria.
335 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
336 return request
.isCompleted() || nbRead
>= request
.getNbRequested() || data
.isNullRef();
339 // ------------------------------------------------------------------------
341 // ------------------------------------------------------------------------
344 public synchronized void startSynch(TmfStartSynchSignal signal
) {
349 public synchronized void endSynch(TmfEndSynchSignal signal
) {
351 if (fSignalDepth
== 0) {