import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.eclipse.linuxtools.tmf.Tracer;
import org.eclipse.linuxtools.tmf.event.TmfData;
*/
public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
+ // ------------------------------------------------------------------------
+ // Constants
+ // ------------------------------------------------------------------------
+
+ private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
+// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
+
+ // ------------------------------------------------------------------------
+ //
+ // ------------------------------------------------------------------------
+
final protected Class<T> fType;
+ final protected boolean fLogData;
+ final protected boolean fLogException;
public static final int DEFAULT_QUEUE_SIZE = 1000;
protected final int fQueueSize;
protected final BlockingQueue<T> fDataQueue;
protected final TmfRequestExecutor fExecutor;
- private int fCoalescingLevel = 0;
+ private int fSignalDepth = 0;
// ------------------------------------------------------------------------
// Constructors
fQueueSize = queueSize;
fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
- Tracer.trace(getName() + " created");
+ if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "created");
fExecutor = new TmfRequestExecutor();
- fCoalescingLevel = 0;
+ fSignalDepth = 0;
+
+ fLogData = Tracer.isEventTraced();
+ fLogException = Tracer.isEventTraced();
TmfProviderManager.register(fType, this);
- Tracer.trace(getName() + " started");
+ if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
}
public TmfDataProvider(TmfDataProvider<T> other) {
fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
fExecutor = new TmfRequestExecutor();
- fCoalescingLevel = 0;
+ fSignalDepth = 0;
+
+ fLogData = Tracer.isEventTraced();
+ fLogException = Tracer.isEventTraced();
}
@Override
public void dispose() {
TmfProviderManager.deregister(fType, this);
fExecutor.stop();
- Tracer.trace(getName() + " stopped");
+
+ if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
+
+ if (fClone != null) fClone.dispose();
super.dispose();
}
// ITmfRequestHandler
// ------------------------------------------------------------------------
- public synchronized void sendRequest(final ITmfDataRequest<T> request) {
-
- if (fCoalescingLevel > 0) {
- // We are in coalescing mode: client should NEVER wait
- // (otherwise we will have deadlock...)
- coalesceDataRequest(request);
- } else {
- // Process the request immediately
- queueRequest(request);
+ protected TmfDataProvider<T> fClone;
+ public void sendRequest(final ITmfDataRequest<T> request) {
+ synchronized(this) {
+ if (fClone == null || request.getExecType() == SHORT) {
+ if (fSignalDepth > 0) {
+ coalesceDataRequest(request);
+ } else {
+ queueRequest(request);
+ }
+ }
+ else {
+ fClone.sendRequest(request);
+ }
}
}
*
* @param thread
*/
- private synchronized void fireRequests() {
- for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
- queueRequest(request);
+ public void fireRequests() {
+ synchronized(this) {
+ for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
+ queueRequest(request);
+ }
+ fPendingCoalescedRequests.clear();
+
+ if (fClone != null)
+ fClone.fireRequests();
}
- fPendingCoalescedRequests.clear();
}
// ------------------------------------------------------------------------
protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
- protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
- TmfCoalescedDataRequest<T> coalescedRequest =
- new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
- coalescedRequest.addRequest(request);
- fPendingCoalescedRequests.add(coalescedRequest);
+ protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
+ synchronized(this) {
+ TmfCoalescedDataRequest<T> coalescedRequest =
+ new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
+ coalescedRequest.addRequest(request);
+ fPendingCoalescedRequests.add(coalescedRequest);
+ }
}
protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
- for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
- if (req.isCompatible(request)) {
- req.addRequest(request);
- return;
+ synchronized(this) {
+ for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
+ if (req.isCompatible(request)) {
+ req.addRequest(request);
+ return;
+ }
}
+ newCoalescedDataRequest(request);
}
- newCoalescedDataRequest(request);
}
// ------------------------------------------------------------------------
protected void queueRequest(final ITmfDataRequest<T> request) {
- final String provider = getName();
-
+ final ITmfDataProvider<T> provider = this;
+ final ITmfComponent component = this;
+
// Process the request
Thread thread = new Thread() {
public void run() {
// Extract the generic information
+ request.start();
int blockSize = request.getBlockize();
int nbRequested = request.getNbRequested();
// Initialize the execution
ITmfContext context = armRequest(request);
if (context == null) {
- request.fail();
+ request.cancel();
return;
}
- // Get the ordered events
- Tracer.trace("Request #" + request.getRequestId() + " is serviced by " + provider);
- T data = getNext(context);
- Tracer.trace("Request #" + request.getRequestId() + " read first event");
- while (data != null && !isCompleted(request, data, nbRead))
- {
- result.add(data);
- if (++nbRead % blockSize == 0) {
- pushData(request, result);
- }
- // To avoid an unnecessary read passed the last data requested
- if (nbRead < nbRequested) {
- data = getNext(context);
- if (data == null || data.isNullRef()) {
- Tracer.trace("Request #" + request.getRequestId() + " end of data");
+ try {
+ // Get the ordered events
+ if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
+ T data = getNext(context);
+ if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
+ while (data != null && !isCompleted(request, data, nbRead))
+ {
+ if (fLogData) Tracer.traceEvent(provider, request, data);
+ result.add(data);
+ if (++nbRead % blockSize == 0) {
+ pushData(request, result);
}
+ // To avoid an unnecessary read passed the last data requested
+ if (nbRead < nbRequested) {
+ data = getNext(context);
+ if (data == null || data.isNullRef()) {
+ if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
+ }
+ }
+ }
+ if (result.size() > 0) {
+ pushData(request, result);
}
+ request.done();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ if (fLogException) Tracer.traceException(e);
+ request.fail();
}
- pushData(request, result);
- request.done();
}
};
fExecutor.execute(thread);
+ if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
}
/**
* @param context
* @return
*/
- public T getNext(ITmfContext context) {
- try {
- T event = fDataQueue.take();
- return event;
- } catch (InterruptedException e) {
- e.printStackTrace();
+ private static final int TIMEOUT = 1000;
+ public T getNext(ITmfContext context) throws InterruptedException {
+ T event = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on read");
+ System.out.println(getName() + ": Request timeout on read");
+ throw new InterruptedException();
}
- return null;
+ return event;
}
/**
*
* @param data
*/
- public void queueResult(T data) {
- try {
- fDataQueue.put(data);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
+ public void queueResult(T data) throws InterruptedException {
+ boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!ok) {
+ if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on write");
+ System.out.println(getName() + ": Request timeout on write");
+ throw new InterruptedException();
}
}
// ------------------------------------------------------------------------
@TmfSignalHandler
- public void startSynch(TmfStartSynchSignal signal) {
- synchronized(this) {
- fCoalescingLevel++;
- }
+ public synchronized void startSynch(TmfStartSynchSignal signal) {
+ fSignalDepth++;
}
@TmfSignalHandler
- public void endSynch(TmfEndSynchSignal signal) {
- synchronized(this) {
- fCoalescingLevel--;
- if (fCoalescingLevel == 0) {
- fireRequests();
- }
+ public synchronized void endSynch(TmfEndSynchSignal signal) {
+ fSignalDepth--;
+ if (fSignalDepth == 0) {
+ fireRequests();
}
}