2010-09-12 Francois Chouinard <fchouinard@gmail.com> Fix for Bug316349
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
9b635e61 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
ce785d7d 21import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 22import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 25import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
26import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33/**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
951d134a 46public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 47
550d787e
FC
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
9b635e61 52// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e
FC
53// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54
55 // ------------------------------------------------------------------------
56 //
57 // ------------------------------------------------------------------------
58
8c8bf09f 59 final protected Class<T> fType;
550d787e 60 final protected boolean fLogData;
cb866e08 61 final protected boolean fLogError;
8c8bf09f
ASL
62
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64 protected final int fQueueSize;
65 protected final BlockingQueue<T> fDataQueue;
66 protected final TmfRequestExecutor fExecutor;
67
550d787e 68 private int fSignalDepth = 0;
045df77d 69 private final Object fLock = new Object();
951d134a 70
8c8bf09f 71 // ------------------------------------------------------------------------
951d134a 72 // Constructors
8c8bf09f
ASL
73 // ------------------------------------------------------------------------
74
75 public TmfDataProvider(String name, Class<T> type) {
76 this(name, type, DEFAULT_QUEUE_SIZE);
77 }
78
79 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
80 super(name);
fc6ccf6f 81 fType = type;
ce785d7d 82 fQueueSize = queueSize;
9b635e61 83 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
84
85 fExecutor = new TmfRequestExecutor();
550d787e
FC
86 fSignalDepth = 0;
87
cb866e08
FC
88 fLogData = Tracer.isEventTraced();
89 fLogError = Tracer.isErrorTraced();
54d55ced
FC
90
91 TmfProviderManager.register(fType, this);
9b635e61 92// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 93}
377f1ad8 94
ce785d7d
FC
95 public TmfDataProvider(TmfDataProvider<T> other) {
96 super(other);
97 fType = other.fType;
98 fQueueSize = other.fQueueSize;
9b635e61 99 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
100
101 fExecutor = new TmfRequestExecutor();
550d787e
FC
102 fSignalDepth = 0;
103
cb866e08
FC
104 fLogData = Tracer.isEventTraced();
105 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
106 }
107
fc6ccf6f 108 @Override
2fb2eb37 109 public void dispose() {
8c8bf09f 110 TmfProviderManager.deregister(fType, this);
54d55ced 111 fExecutor.stop();
2fb2eb37 112 super.dispose();
3d62f8b7 113// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
8c8bf09f
ASL
114 }
115
116 public int getQueueSize() {
117 return fQueueSize;
118 }
119
ff4ed569
FC
120 public Class<?> getType() {
121 return fType;
122 }
123
8c8bf09f
ASL
124 // ------------------------------------------------------------------------
125 // ITmfRequestHandler
126 // ------------------------------------------------------------------------
127
550d787e 128 public void sendRequest(final ITmfDataRequest<T> request) {
045df77d 129 synchronized(fLock) {
9b635e61
FC
130 if (fSignalDepth > 0) {
131 coalesceDataRequest(request);
132 } else {
133 dispatchRequest(request);
550d787e 134 }
951d134a
FC
135 }
136 }
137
138 /**
139 * This method queues the coalesced requests.
140 *
141 * @param thread
142 */
550d787e 143 public void fireRequests() {
045df77d 144 synchronized(fLock) {
550d787e 145 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
9b635e61 146 dispatchRequest(request);
550d787e
FC
147 }
148 fPendingCoalescedRequests.clear();
951d134a 149 }
951d134a
FC
150 }
151
152 // ------------------------------------------------------------------------
153 // Coalescing (primitive test...)
154 // ------------------------------------------------------------------------
8c8bf09f 155
951d134a 156 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 157
550d787e 158 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
045df77d 159 synchronized(fLock) {
550d787e 160 TmfCoalescedDataRequest<T> coalescedRequest =
cb866e08 161 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
550d787e 162 coalescedRequest.addRequest(request);
9b635e61
FC
163 if (Tracer.isRequestTraced()) {
164 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
165 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
166 }
550d787e
FC
167 fPendingCoalescedRequests.add(coalescedRequest);
168 }
951d134a 169 }
8c8bf09f 170
045df77d
FC
171 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
172 synchronized(fLock) {
550d787e
FC
173 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
174 if (req.isCompatible(request)) {
175 req.addRequest(request);
9b635e61
FC
176 if (Tracer.isRequestTraced()) {
177 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
178 Tracer.traceRequest(req, "added " + request.getRequestId());
179 }
550d787e
FC
180 return;
181 }
951d134a 182 }
550d787e 183 newCoalescedDataRequest(request);
8c8bf09f 184 }
8c8bf09f
ASL
185 }
186
951d134a
FC
187 // ------------------------------------------------------------------------
188 // Request processing
189 // ------------------------------------------------------------------------
190
9b635e61
FC
191 private void dispatchRequest(final ITmfDataRequest<T> request) {
192 if (request.getExecType() == ExecutionType.SHORT)
193 queueRequest(request);
194 else
195 queueLongRequest(request);
196 }
197
2fb2eb37 198 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 199
045df77d
FC
200 if (fExecutor.isShutdown()) {
201 request.cancel();
202 return;
203 }
204
3d62f8b7 205 final TmfDataProvider<T> provider = this;
550d787e 206
8c8bf09f 207 // Process the request
9b635e61 208 TmfThread thread = new TmfThread(request.getExecType()) {
8c8bf09f
ASL
209
210 @Override
211 public void run() {
212
3d62f8b7 213 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
9b635e61 214
8c8bf09f 215 // Extract the generic information
550d787e 216 request.start();
8c8bf09f
ASL
217 int blockSize = request.getBlockize();
218 int nbRequested = request.getNbRequested();
219
220 // Create the result buffer
221 Vector<T> result = new Vector<T>();
222 int nbRead = 0;
223
224 // Initialize the execution
225 ITmfContext context = armRequest(request);
226 if (context == null) {
550d787e 227 request.cancel();
8c8bf09f
ASL
228 return;
229 }
230
550d787e
FC
231 try {
232 // Get the ordered events
3d62f8b7 233 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
550d787e 234 T data = getNext(context);
3d62f8b7 235 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
550d787e
FC
236 while (data != null && !isCompleted(request, data, nbRead))
237 {
3d62f8b7 238 if (fLogData) Tracer.traceEvent(provider, request, data);
550d787e
FC
239 result.add(data);
240 if (++nbRead % blockSize == 0) {
241 pushData(request, result);
ce785d7d 242 }
550d787e
FC
243 // To avoid an unnecessary read passed the last data requested
244 if (nbRead < nbRequested) {
245 data = getNext(context);
3d62f8b7
FC
246 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
247 Tracer.trace("Request #" + request.getRequestId() + " end of data");
248 }
550d787e
FC
249 }
250 }
251 if (result.size() > 0) {
252 pushData(request, result);
7f407ead 253 }
550d787e 254 request.done();
9b635e61 255
3d62f8b7 256 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
550d787e
FC
257 }
258 catch (Exception e) {
9b635e61 259 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
550d787e 260 request.fail();
cb866e08 261// e.printStackTrace();
8c8bf09f 262 }
8c8bf09f
ASL
263 }
264 };
3d62f8b7 265
5c00c0b7 266 fExecutor.execute(thread);
3d62f8b7 267
550d787e 268 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
269 }
270
9b635e61
FC
271 // By default, same behavior as a short request
272 protected void queueLongRequest(final ITmfDataRequest<T> request) {
273 queueRequest(request);
274 }
275
8c8bf09f
ASL
276 /**
277 * Format the result data and forwards it to the requester.
278 * Note: after handling, the data is *removed*.
279 *
280 * @param request
281 * @param data
282 */
283 @SuppressWarnings("unchecked")
951d134a 284 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
285 synchronized(request) {
286 if (!request.isCompleted()) {
287 T[] result = (T[]) Array.newInstance(fType, data.size());
288 data.toArray(result);
289 request.setData(result);
290 request.handleData();
291 data.removeAllElements();
292 }
293 }
294 }
295
951d134a
FC
296 /**
297 * Initialize the provider based on the request. The context is
298 * provider specific and will be updated by getNext().
299 *
300 * @param request
301 * @return an application specific context; null if request can't be serviced
302 */
2fb2eb37 303 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
3d62f8b7
FC
304 public abstract T getNext(ITmfContext context);
305
306// public abstract void queueResult(T data);
307
8c8bf09f
ASL
308 /**
309 * Return the next piece of data based on the context supplied. The context
310 * would typically be updated for the subsequent read.
311 *
312 * @param context
313 * @return
314 */
3d62f8b7
FC
315// private static final int TIMEOUT = 10000;
316//// public abstract T getNext(ITmfContext context) throws InterruptedException;
317//// private int getLevel = 0;
318// public T getNext(ITmfContext context) throws InterruptedException {
319//// String name = Thread.currentThread().getName(); getLevel++;
320//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
321// T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
322// if (data == null) {
323//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
324// throw new InterruptedException();
325// }
326//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
327//// getLevel--;
328// return data;
329// }
330//
331// /**
332// * Makes the generated result data available for getNext()
333// *
334// * @param data
335// */
336//// public abstract void queueResult(T data) throws InterruptedException;
337//// private int putLevel = 0;
338// public void queueResult(T data) throws InterruptedException {
339//// String name = Thread.currentThread().getName(); putLevel++;
340//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
341// boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
342// if (!ok) {
343//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
344// throw new InterruptedException();
345// }
346//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
347//// putLevel--;
348// }
8c8bf09f
ASL
349
350 /**
351 * Checks if the data meets the request completion criteria.
352 *
353 * @param request
354 * @param data
355 * @return
356 */
2fb2eb37 357 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 358 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
359 }
360
951d134a
FC
361 // ------------------------------------------------------------------------
362 // Signal handlers
363 // ------------------------------------------------------------------------
364
8c8bf09f 365 @TmfSignalHandler
045df77d
FC
366 public void startSynch(TmfStartSynchSignal signal) {
367 synchronized (fLock) {
368 fSignalDepth++;
369 }
8c8bf09f
ASL
370 }
371
372 @TmfSignalHandler
045df77d
FC
373 public void endSynch(TmfEndSynchSignal signal) {
374 synchronized (fLock) {
375 fSignalDepth--;
376 if (fSignalDepth == 0) {
377 fireRequests();
378 }
379 }
8c8bf09f
ASL
380 }
381
382}
This page took 0.047047 seconds and 5 git commands to generate.