2010-09-15 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug287563
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.lang.reflect.Array;
16 import java.util.Vector;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.SynchronousQueue;
20
21 import org.eclipse.linuxtools.tmf.Tracer;
22 import org.eclipse.linuxtools.tmf.event.TmfData;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
24 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
25 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33 /**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
46 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
47
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
52 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
53 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54
55 // ------------------------------------------------------------------------
56 //
57 // ------------------------------------------------------------------------
58
59 final protected Class<T> fType;
60 final protected boolean fLogData;
61 final protected boolean fLogError;
62
63 public static final int DEFAULT_BLOCK_SIZE = 5000;
64 public static final int DEFAULT_QUEUE_SIZE = 1000;
65
66 protected final int fQueueSize;
67 protected final BlockingQueue<T> fDataQueue;
68 protected final TmfRequestExecutor fExecutor;
69
70 private int fSignalDepth = 0;
71 private final Object fLock = new Object();
72
73 // ------------------------------------------------------------------------
74 // Constructors
75 // ------------------------------------------------------------------------
76
77 public TmfDataProvider(String name, Class<T> type) {
78 this(name, type, DEFAULT_QUEUE_SIZE);
79 }
80
81 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
82 super(name);
83 fType = type;
84 fQueueSize = queueSize;
85 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
86
87 fExecutor = new TmfRequestExecutor();
88 fSignalDepth = 0;
89
90 fLogData = Tracer.isEventTraced();
91 fLogError = Tracer.isErrorTraced();
92
93 TmfProviderManager.register(fType, this);
94 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
95 }
96
97 public TmfDataProvider(TmfDataProvider<T> other) {
98 super(other);
99 fType = other.fType;
100 fQueueSize = other.fQueueSize;
101 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
102
103 fExecutor = new TmfRequestExecutor();
104 fSignalDepth = 0;
105
106 fLogData = Tracer.isEventTraced();
107 fLogError = Tracer.isErrorTraced();
108 }
109
110 @Override
111 public void dispose() {
112 TmfProviderManager.deregister(fType, this);
113 fExecutor.stop();
114 super.dispose();
115 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
116 }
117
118 public int getQueueSize() {
119 return fQueueSize;
120 }
121
122 public Class<?> getType() {
123 return fType;
124 }
125
126 // ------------------------------------------------------------------------
127 // ITmfRequestHandler
128 // ------------------------------------------------------------------------
129
130 public void sendRequest(final ITmfDataRequest<T> request) {
131 synchronized(fLock) {
132 if (fSignalDepth > 0) {
133 coalesceDataRequest(request);
134 } else {
135 dispatchRequest(request);
136 }
137 }
138 }
139
140 /**
141 * This method queues the coalesced requests.
142 *
143 * @param thread
144 */
145 public void fireRequests() {
146 synchronized(fLock) {
147 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
148 dispatchRequest(request);
149 }
150 fPendingCoalescedRequests.clear();
151 }
152 }
153
154 // ------------------------------------------------------------------------
155 // Coalescing (primitive test...)
156 // ------------------------------------------------------------------------
157
158 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
159
160 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
161 synchronized(fLock) {
162 TmfCoalescedDataRequest<T> coalescedRequest =
163 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
164 coalescedRequest.addRequest(request);
165 if (Tracer.isRequestTraced()) {
166 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
167 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
168 }
169 fPendingCoalescedRequests.add(coalescedRequest);
170 }
171 }
172
173 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
174 synchronized(fLock) {
175 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
176 if (req.isCompatible(request)) {
177 req.addRequest(request);
178 if (Tracer.isRequestTraced()) {
179 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
180 Tracer.traceRequest(req, "added " + request.getRequestId());
181 }
182 return;
183 }
184 }
185 newCoalescedDataRequest(request);
186 }
187 }
188
189 // ------------------------------------------------------------------------
190 // Request processing
191 // ------------------------------------------------------------------------
192
193 private void dispatchRequest(final ITmfDataRequest<T> request) {
194 if (request.getExecType() == ExecutionType.FOREGROUND)
195 queueRequest(request);
196 else
197 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
198 }
199
200 protected void queueRequest(final ITmfDataRequest<T> request) {
201
202 if (fExecutor.isShutdown()) {
203 request.cancel();
204 return;
205 }
206
207 // final TmfDataProvider<T> provider = this;
208
209 // Process the request
210 TmfThread thread = new TmfThread(request.getExecType()) {
211
212 @Override
213 public void run() {
214
215 // if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
216
217 // Extract the generic information
218 request.start();
219 int blockSize = request.getBlockize();
220 int nbRequested = request.getNbRequested();
221
222 // Create the result buffer
223 Vector<T> result = new Vector<T>();
224 int nbRead = 0;
225
226 // Initialize the execution
227 ITmfContext context = armRequest(request);
228 if (context == null) {
229 request.cancel();
230 return;
231 }
232
233 try {
234 // Get the ordered events
235 // if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
236 T data = getNext(context);
237 // if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
238 while (data != null && !isCompleted(request, data, nbRead))
239 {
240 // if (fLogData) Tracer.traceEvent(provider, request, data);
241 result.add(data);
242 if (++nbRead % blockSize == 0) {
243 pushData(request, result);
244 }
245 // To avoid an unnecessary read passed the last data requested
246 if (nbRead < nbRequested) {
247 data = getNext(context);
248 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
249 Tracer.trace("Request #" + request.getRequestId() + " end of data");
250 }
251 }
252 }
253 if (result.size() > 0) {
254 pushData(request, result);
255 }
256 request.done();
257
258 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
259 }
260 catch (Exception e) {
261 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
262 request.fail();
263 // e.printStackTrace();
264 }
265 }
266 };
267
268 fExecutor.execute(thread);
269
270 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
271 }
272
273 // By default, same behavior as a foreground request
274 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
275 queueRequest(request);
276 }
277
278 /**
279 * Format the result data and forwards it to the requester.
280 * Note: after handling, the data is *removed*.
281 *
282 * @param request
283 * @param data
284 */
285 @SuppressWarnings("unchecked")
286 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
287 synchronized(request) {
288 if (!request.isCompleted()) {
289 T[] result = (T[]) Array.newInstance(fType, data.size());
290 data.toArray(result);
291 request.setData(result);
292 request.handleData();
293 data.removeAllElements();
294 }
295 }
296 }
297
298 /**
299 * Initialize the provider based on the request. The context is
300 * provider specific and will be updated by getNext().
301 *
302 * @param request
303 * @return an application specific context; null if request can't be serviced
304 */
305 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
306 public abstract T getNext(ITmfContext context);
307
308 // public abstract void queueResult(T data);
309
310 /**
311 * Return the next piece of data based on the context supplied. The context
312 * would typically be updated for the subsequent read.
313 *
314 * @param context
315 * @return
316 */
317 // private static final int TIMEOUT = 10000;
318 //// public abstract T getNext(ITmfContext context) throws InterruptedException;
319 //// private int getLevel = 0;
320 // public T getNext(ITmfContext context) throws InterruptedException {
321 //// String name = Thread.currentThread().getName(); getLevel++;
322 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
323 // T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
324 // if (data == null) {
325 //// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
326 // throw new InterruptedException();
327 // }
328 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
329 //// getLevel--;
330 // return data;
331 // }
332 //
333 // /**
334 // * Makes the generated result data available for getNext()
335 // *
336 // * @param data
337 // */
338 //// public abstract void queueResult(T data) throws InterruptedException;
339 //// private int putLevel = 0;
340 // public void queueResult(T data) throws InterruptedException {
341 //// String name = Thread.currentThread().getName(); putLevel++;
342 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
343 // boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
344 // if (!ok) {
345 //// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
346 // throw new InterruptedException();
347 // }
348 //// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
349 //// putLevel--;
350 // }
351
352 /**
353 * Checks if the data meets the request completion criteria.
354 *
355 * @param request
356 * @param data
357 * @return
358 */
359 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
360 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
361 }
362
363 // ------------------------------------------------------------------------
364 // Signal handlers
365 // ------------------------------------------------------------------------
366
367 @TmfSignalHandler
368 public void startSynch(TmfStartSynchSignal signal) {
369 synchronized (fLock) {
370 fSignalDepth++;
371 }
372 }
373
374 @TmfSignalHandler
375 public void endSynch(TmfEndSynchSignal signal) {
376 synchronized (fLock) {
377 fSignalDepth--;
378 if (fSignalDepth == 0) {
379 fireRequests();
380 }
381 }
382 }
383
384 }
This page took 0.037997 seconds and 5 git commands to generate.