Monster merge from the integration branch. Still some problems left and JUnits failing.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
index d93a6b6f741e6e0b767948f3d8dc72571c1154ac..805cd1e7dcb09a9b092e17a6834d9fba0817b20b 100644 (file)
@@ -17,6 +17,7 @@ import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.eclipse.linuxtools.tmf.Tracer;
 import org.eclipse.linuxtools.tmf.event.TmfData;
@@ -44,14 +45,27 @@ import org.eclipse.linuxtools.tmf.trace.ITmfContext;
  */
 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
 
+       // ------------------------------------------------------------------------
+       // Constants
+       // ------------------------------------------------------------------------
+
+       private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
+//     private static final ITmfDataRequest.ExecutionType LONG  = ITmfDataRequest.ExecutionType.LONG;
+       
+       // ------------------------------------------------------------------------
+       // 
+       // ------------------------------------------------------------------------
+
        final protected Class<T> fType;
+       final protected boolean  fLogData;
+       final protected boolean  fLogException;
 
        public static final int DEFAULT_QUEUE_SIZE = 1000;
        protected final int fQueueSize;
        protected final BlockingQueue<T> fDataQueue;
        protected final TmfRequestExecutor fExecutor;
 
-       private int fCoalescingLevel = 0;
+       private int fSignalDepth = 0;
 
        // ------------------------------------------------------------------------
        // Constructors
@@ -67,13 +81,16 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                fQueueSize = queueSize;
                fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
 
-        Tracer.trace(getName() + " created");
+        if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "created");
 
         fExecutor = new TmfRequestExecutor();
-               fCoalescingLevel = 0;
+               fSignalDepth = 0;
+
+               fLogData = Tracer.isEventTraced();
+               fLogException = Tracer.isEventTraced();
 
                TmfProviderManager.register(fType, this);
-        Tracer.trace(getName() + " started");
+               if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
 }
        
        public TmfDataProvider(TmfDataProvider<T> other) {
@@ -83,14 +100,20 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
         fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
 
         fExecutor = new TmfRequestExecutor();
-        fCoalescingLevel = 0;
+        fSignalDepth = 0;
+
+        fLogData = Tracer.isEventTraced();
+               fLogException = Tracer.isEventTraced();
        }
        
        @Override
        public void dispose() {
                TmfProviderManager.deregister(fType, this);
                fExecutor.stop();
-        Tracer.trace(getName() + " stopped");
+
+               if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
+               
+               if (fClone != null) fClone.dispose();
                super.dispose();
        }
 
@@ -106,15 +129,19 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
        // ITmfRequestHandler
        // ------------------------------------------------------------------------
 
-       public synchronized void sendRequest(final ITmfDataRequest<T> request) {
-
-               if (fCoalescingLevel > 0) {
-                       // We are in coalescing mode: client should NEVER wait
-                       // (otherwise we will have deadlock...)
-                       coalesceDataRequest(request);
-               } else {
-                       // Process the request immediately 
-                       queueRequest(request);
+       protected TmfDataProvider<T> fClone;
+       public void sendRequest(final ITmfDataRequest<T> request) {
+               synchronized(this) {
+                       if (fClone == null || request.getExecType() == SHORT) {
+                               if (fSignalDepth > 0) {
+                                       coalesceDataRequest(request);
+                               } else {
+                                       queueRequest(request);
+                               }
+                       }
+                       else {
+                               fClone.sendRequest(request);
+                       }
                }
        }
 
@@ -123,11 +150,16 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
         * 
         * @param thread
         */
-       private synchronized void fireRequests() {
-               for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
-                       queueRequest(request);
+       public void fireRequests() {
+               synchronized(this) {
+                       for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
+                               queueRequest(request);
+                       }
+                       fPendingCoalescedRequests.clear();
+
+                       if (fClone != null)
+                               fClone.fireRequests();
                }
-               fPendingCoalescedRequests.clear();
        }
 
        // ------------------------------------------------------------------------
@@ -136,21 +168,25 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
 
        protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
 
-       protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
-               TmfCoalescedDataRequest<T> coalescedRequest =
-                       new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
-               coalescedRequest.addRequest(request);
-               fPendingCoalescedRequests.add(coalescedRequest);
+       protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
+               synchronized(this) {
+                       TmfCoalescedDataRequest<T> coalescedRequest =
+                               new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
+                       coalescedRequest.addRequest(request);
+                       fPendingCoalescedRequests.add(coalescedRequest);
+               }
        }
 
        protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
-               for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
-                       if (req.isCompatible(request)) {
-                               req.addRequest(request);
-                               return;
+               synchronized(this) {
+                       for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
+                               if (req.isCompatible(request)) {
+                                       req.addRequest(request);
+                                       return;
+                               }
                        }
+                       newCoalescedDataRequest(request);
                }
-               newCoalescedDataRequest(request);
        }
 
        // ------------------------------------------------------------------------
@@ -159,8 +195,9 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
 
        protected void queueRequest(final ITmfDataRequest<T> request) {
 
-               final String provider = getName();
-               
+               final ITmfDataProvider<T> provider  = this;
+               final ITmfComponent    component = this;
+
                // Process the request
                Thread thread = new Thread() {
 
@@ -168,6 +205,7 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                        public void run() {
 
                                // Extract the generic information
+                               request.start();
                                int blockSize   = request.getBlockize();
                                int nbRequested = request.getNbRequested();
                         
@@ -178,33 +216,44 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                                // Initialize the execution
                                ITmfContext context = armRequest(request);
                                if (context == null) {
-                                       request.fail();
+                                       request.cancel();
                                        return;
                                }
 
-                               // Get the ordered events
-                               Tracer.trace("Request #" + request.getRequestId() + " is serviced by " + provider);
-                               T data = getNext(context);
-                               Tracer.trace("Request #" + request.getRequestId() + " read first event");
-                               while (data != null && !isCompleted(request, data, nbRead))
-                               {
-                                       result.add(data);
-                                       if (++nbRead % blockSize == 0) {
-                                               pushData(request, result);
-                                       }
-                                       // To avoid an unnecessary read passed the last data requested
-                                       if (nbRead < nbRequested) {
-                                               data = getNext(context);
-                                               if (data == null || data.isNullRef()) {
-                                                       Tracer.trace("Request #" + request.getRequestId() + " end of data");
+                               try {
+                                       // Get the ordered events
+                                       if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
+                                       T data = getNext(context);
+                                       if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
+                                       while (data != null && !isCompleted(request, data, nbRead))
+                                       {
+                                               if (fLogData) Tracer.traceEvent(provider, request, data);
+                                               result.add(data);
+                                               if (++nbRead % blockSize == 0) {
+                                                       pushData(request, result);
                                                }
+                                               // To avoid an unnecessary read passed the last data requested
+                                               if (nbRead < nbRequested) {
+                                                       data = getNext(context);
+                                                       if (data == null || data.isNullRef()) {
+                                                               if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
+                                                       }
+                                               }
+                                       }
+                                       if (result.size() > 0) {
+                                               pushData(request, result);
                                        }
+                                       request.done();
+                               }
+                               catch (Exception e) {
+                                       e.printStackTrace();
+                                       if (fLogException) Tracer.traceException(e);
+                                       request.fail();
                                }
-                               pushData(request, result);
-                               request.done();
                        }
                };
                fExecutor.execute(thread);
+        if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
        }
 
        /**
@@ -243,14 +292,15 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
         * @param context
         * @return
         */
-       public T getNext(ITmfContext context) {
-               try {
-                       T event = fDataQueue.take();
-                       return event;
-               } catch (InterruptedException e) {
-                       e.printStackTrace();
+       private static final int TIMEOUT = 1000;
+       public T getNext(ITmfContext context) throws InterruptedException {
+               T event = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+               if (event == null) {
+                       if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on read");
+                       System.out.println(getName() + ": Request timeout on read");
+                       throw new InterruptedException();
                }
-               return null;
+               return event;
        }
 
        /**
@@ -258,11 +308,12 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
         * 
         * @param data
         */
-       public void queueResult(T data) {
-               try {
-                       fDataQueue.put(data);
-               } catch (InterruptedException e1) {
-                       e1.printStackTrace();
+       public void queueResult(T data) throws InterruptedException {
+               boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
+               if (!ok) {
+                       if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on write");
+                       System.out.println(getName() + ": Request timeout on write");
+                       throw new InterruptedException();
                }
        }
 
@@ -282,19 +333,15 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
        // ------------------------------------------------------------------------
 
        @TmfSignalHandler
-       public void startSynch(TmfStartSynchSignal signal) {
-               synchronized(this) {
-                       fCoalescingLevel++;
-               }
+       public synchronized void startSynch(TmfStartSynchSignal signal) {
+               fSignalDepth++;
        }
 
        @TmfSignalHandler
-       public void endSynch(TmfEndSynchSignal signal) {
-               synchronized(this) {
-                       fCoalescingLevel--;
-                       if (fCoalescingLevel == 0) {
-                               fireRequests();
-                       }
+       public synchronized void endSynch(TmfEndSynchSignal signal) {
+               fSignalDepth--;
+               if (fSignalDepth == 0) {
+                       fireRequests();
                }
        }
 
This page took 0.02775 seconds and 5 git commands to generate.