[Bug292967] Second part of request coalescing + unit tests + minor fixes.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
index acdbbfa3229ba5935b156c3e7f1b6e6b52e793da..ae68357fb63f1af0b85493c44a04b574831f37ba 100644 (file)
@@ -16,9 +16,11 @@ import java.lang.reflect.Array;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 
 import org.eclipse.linuxtools.tmf.event.TmfData;
-import org.eclipse.linuxtools.tmf.request.ITmfRequestHandler;
+import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
+import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
@@ -39,7 +41,7 @@ import org.eclipse.linuxtools.tmf.trace.ITmfContext;
  * <p>
  * TODO: Add support for providing multiple data types.
  */
-public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfRequestHandler<T> {
+public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
 
        final protected Class<T> fType;
 
@@ -48,8 +50,10 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
        protected final BlockingQueue<T> fDataQueue;
        protected final TmfRequestExecutor fExecutor;
 
+       private Integer fSynchDepth;
+
        // ------------------------------------------------------------------------
-       // Constructors (enforce that a type be supplied) 
+       // Constructors
        // ------------------------------------------------------------------------
        
        public TmfDataProvider(String name, Class<T> type) {
@@ -60,8 +64,10 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                super(name);
                fQueueSize = queueSize;
                fType = type;
-               fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
+               fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
+
                fExecutor = new TmfRequestExecutor();
+               fSynchDepth = 0;
                register();
        }
 
@@ -85,25 +91,58 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
        // ITmfRequestHandler
        // ------------------------------------------------------------------------
 
-       // TODO: Request coalescing, filtering, result dispatching
+       public void sendRequest(final TmfDataRequest<T> request) {
+
+               if (fSynchDepth > 0) {
+                       // We are in coalescing mode: client should NEVER wait
+                       // (otherwise we will have deadlock...)
+                       coalesceDataRequest(request);
+               } else {
+                       // Process the request immediately 
+                       queueRequest(request);
+               }
+       }
+
+       /**
+        * This method queues the coalesced requests.
+        * 
+        * @param thread
+        */
+       private synchronized void fireRequests() {
+               for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
+                       queueRequest(request);
+               }
+               fPendingCoalescedRequests.clear();
+       }
+
+       // ------------------------------------------------------------------------
+       // Coalescing (primitive test...)
+       // ------------------------------------------------------------------------
 
-       public void processRequest(final TmfDataRequest<T> request, boolean waitForCompletion) {
+       protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
 
-//             System.out.println("[" + getName() + "]" + " New request: " + request.getRequestId());
-//             if (request.getRequestId() == 0) {
-//                     System.out.println("");
-//             }
-               
-               //Process the request 
-               processDataRequest(request);
+       protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) {
+               TmfCoalescedDataRequest<T> coalescedRequest =
+                       new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
+               coalescedRequest.addRequest(request);
+               fPendingCoalescedRequests.add(coalescedRequest);
+       }
 
-               // Wait for completion if requested
-       if (waitForCompletion) {
-                       request.waitForCompletion();
+       protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) {
+               for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
+                       if (req.isCompatible(request)) {
+                               req.addRequest(request);
+                               return;
+                       }
                }
+               newCoalescedDataRequest(request);
        }
 
-       protected void processDataRequest(final TmfDataRequest<T> request) {
+       // ------------------------------------------------------------------------
+       // Request processing
+       // ------------------------------------------------------------------------
+
+       protected void queueRequest(final TmfDataRequest<T> request) {
 
                // Process the request
                Thread thread = new Thread() {
@@ -153,7 +192,7 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
         * @param data
         */
        @SuppressWarnings("unchecked")
-       protected void pushData(TmfDataRequest<T> request, Vector<T> data) {
+       protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
                synchronized(request) {
                        if (!request.isCompleted()) {
                                T[] result = (T[]) Array.newInstance(fType, data.size());
@@ -165,6 +204,15 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                }
        }
 
+       /**
+        * Initialize the provider based on the request. The context is
+        * provider specific and will be updated by getNext().
+        * 
+        * @param request
+        * @return an application specific context; null if request can't be serviced
+        */
+       public abstract ITmfContext armRequest(TmfDataRequest<T> request);
+       
        /**
         * Return the next piece of data based on the context supplied. The context
         * would typically be updated for the subsequent read.
@@ -182,6 +230,11 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                return null;
        }
 
+       /**
+        * Makes the generated result data available for getNext()
+        * 
+        * @param data
+        */
        public void queueResult(T data) {
                try {
                        fDataQueue.put(data);
@@ -190,15 +243,6 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                }
        }
 
-       /**
-        * Initialize the provider based on the request. The context is
-        * provider specific and will be updated by getNext().
-        * 
-        * @param request
-        * @return an application specific context; null if request can't be serviced
-        */
-       public abstract ITmfContext armRequest(TmfDataRequest<T> request);
-       
        /**
         * Checks if the data meets the request completion criteria.
         * 
@@ -210,16 +254,25 @@ public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent im
                return request.isCompleted() || nbRead >= request.getNbRequested();
        }
 
+       // ------------------------------------------------------------------------
+       // Signal handlers
+       // ------------------------------------------------------------------------
+
        @TmfSignalHandler
        public void startSynch(TmfStartSynchSignal signal) {
-//             if (getName().equals("MyExperiment"))
-//                     System.out.println("[" + getName() + "]" + " Start synch: " + signal.getReference());
+               synchronized(fSynchDepth) {
+                       fSynchDepth++;
+               }
        }
 
        @TmfSignalHandler
        public void endSynch(TmfEndSynchSignal signal) {
-//             if (getName().equals("MyExperiment"))
-//                     System.out.println("[" + getName() + "]" + " End synch: " + signal.getReference());
+               synchronized(fSynchDepth) {
+                       fSynchDepth--;
+                       if (fSynchDepth == 0) {
+                               fireRequests();
+                       }
+               }
        }
 
 }
This page took 0.026839 seconds and 5 git commands to generate.