2010-07-19 Francois Chouinard <fchouinard@gmail.com>
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
9b635e61 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
ce785d7d 21import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 22import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 25import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
26import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33/**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
951d134a 46public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 47
550d787e
FC
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
9b635e61 52// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e
FC
53// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54
55 // ------------------------------------------------------------------------
56 //
57 // ------------------------------------------------------------------------
58
8c8bf09f 59 final protected Class<T> fType;
550d787e 60 final protected boolean fLogData;
cb866e08 61 final protected boolean fLogError;
8c8bf09f
ASL
62
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64 protected final int fQueueSize;
65 protected final BlockingQueue<T> fDataQueue;
66 protected final TmfRequestExecutor fExecutor;
67
550d787e 68 private int fSignalDepth = 0;
951d134a 69
8c8bf09f 70 // ------------------------------------------------------------------------
951d134a 71 // Constructors
8c8bf09f
ASL
72 // ------------------------------------------------------------------------
73
74 public TmfDataProvider(String name, Class<T> type) {
75 this(name, type, DEFAULT_QUEUE_SIZE);
76 }
77
78 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
79 super(name);
fc6ccf6f 80 fType = type;
ce785d7d 81 fQueueSize = queueSize;
9b635e61 82 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
83
84 fExecutor = new TmfRequestExecutor();
550d787e
FC
85 fSignalDepth = 0;
86
cb866e08
FC
87 fLogData = Tracer.isEventTraced();
88 fLogError = Tracer.isErrorTraced();
54d55ced
FC
89
90 TmfProviderManager.register(fType, this);
9b635e61 91// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 92}
377f1ad8 93
ce785d7d
FC
94 public TmfDataProvider(TmfDataProvider<T> other) {
95 super(other);
96 fType = other.fType;
97 fQueueSize = other.fQueueSize;
9b635e61 98 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
99
100 fExecutor = new TmfRequestExecutor();
550d787e
FC
101 fSignalDepth = 0;
102
cb866e08
FC
103 fLogData = Tracer.isEventTraced();
104 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
105 }
106
fc6ccf6f 107 @Override
2fb2eb37 108 public void dispose() {
8c8bf09f 109 TmfProviderManager.deregister(fType, this);
54d55ced 110 fExecutor.stop();
2fb2eb37 111 super.dispose();
3d62f8b7 112// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
8c8bf09f
ASL
113 }
114
115 public int getQueueSize() {
116 return fQueueSize;
117 }
118
ff4ed569
FC
119 public Class<?> getType() {
120 return fType;
121 }
122
8c8bf09f
ASL
123 // ------------------------------------------------------------------------
124 // ITmfRequestHandler
125 // ------------------------------------------------------------------------
126
550d787e
FC
127 public void sendRequest(final ITmfDataRequest<T> request) {
128 synchronized(this) {
9b635e61
FC
129 if (fSignalDepth > 0) {
130 coalesceDataRequest(request);
131 } else {
132 dispatchRequest(request);
550d787e 133 }
951d134a
FC
134 }
135 }
136
137 /**
138 * This method queues the coalesced requests.
139 *
140 * @param thread
141 */
550d787e
FC
142 public void fireRequests() {
143 synchronized(this) {
144 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
9b635e61 145 dispatchRequest(request);
550d787e
FC
146 }
147 fPendingCoalescedRequests.clear();
951d134a 148 }
951d134a
FC
149 }
150
151 // ------------------------------------------------------------------------
152 // Coalescing (primitive test...)
153 // ------------------------------------------------------------------------
8c8bf09f 154
951d134a 155 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 156
550d787e
FC
157 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
158 synchronized(this) {
159 TmfCoalescedDataRequest<T> coalescedRequest =
cb866e08 160 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
550d787e 161 coalescedRequest.addRequest(request);
9b635e61
FC
162 if (Tracer.isRequestTraced()) {
163 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
164 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
165 }
550d787e
FC
166 fPendingCoalescedRequests.add(coalescedRequest);
167 }
951d134a 168 }
8c8bf09f 169
2fb2eb37 170 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
550d787e
FC
171 synchronized(this) {
172 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
173 if (req.isCompatible(request)) {
174 req.addRequest(request);
9b635e61
FC
175 if (Tracer.isRequestTraced()) {
176 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
177 Tracer.traceRequest(req, "added " + request.getRequestId());
178 }
550d787e
FC
179 return;
180 }
951d134a 181 }
550d787e 182 newCoalescedDataRequest(request);
8c8bf09f 183 }
8c8bf09f
ASL
184 }
185
951d134a
FC
186 // ------------------------------------------------------------------------
187 // Request processing
188 // ------------------------------------------------------------------------
189
9b635e61
FC
190 private void dispatchRequest(final ITmfDataRequest<T> request) {
191 if (request.getExecType() == ExecutionType.SHORT)
192 queueRequest(request);
193 else
194 queueLongRequest(request);
195 }
196
2fb2eb37 197 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 198
3d62f8b7 199 final TmfDataProvider<T> provider = this;
550d787e 200
8c8bf09f 201 // Process the request
9b635e61 202 TmfThread thread = new TmfThread(request.getExecType()) {
8c8bf09f
ASL
203
204 @Override
205 public void run() {
206
3d62f8b7 207 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
9b635e61 208
8c8bf09f 209 // Extract the generic information
550d787e 210 request.start();
8c8bf09f
ASL
211 int blockSize = request.getBlockize();
212 int nbRequested = request.getNbRequested();
213
214 // Create the result buffer
215 Vector<T> result = new Vector<T>();
216 int nbRead = 0;
217
218 // Initialize the execution
219 ITmfContext context = armRequest(request);
220 if (context == null) {
550d787e 221 request.cancel();
8c8bf09f
ASL
222 return;
223 }
224
550d787e
FC
225 try {
226 // Get the ordered events
3d62f8b7 227 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
550d787e 228 T data = getNext(context);
3d62f8b7 229 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
550d787e
FC
230 while (data != null && !isCompleted(request, data, nbRead))
231 {
3d62f8b7 232 if (fLogData) Tracer.traceEvent(provider, request, data);
550d787e
FC
233 result.add(data);
234 if (++nbRead % blockSize == 0) {
235 pushData(request, result);
ce785d7d 236 }
550d787e
FC
237 // To avoid an unnecessary read passed the last data requested
238 if (nbRead < nbRequested) {
239 data = getNext(context);
3d62f8b7
FC
240 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
241 Tracer.trace("Request #" + request.getRequestId() + " end of data");
242 }
550d787e
FC
243 }
244 }
245 if (result.size() > 0) {
246 pushData(request, result);
7f407ead 247 }
550d787e 248 request.done();
9b635e61 249
3d62f8b7 250 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
550d787e
FC
251 }
252 catch (Exception e) {
9b635e61 253 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
550d787e 254 request.fail();
cb866e08 255// e.printStackTrace();
8c8bf09f 256 }
8c8bf09f
ASL
257 }
258 };
3d62f8b7 259
5c00c0b7 260 fExecutor.execute(thread);
3d62f8b7 261
550d787e 262 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
263 }
264
9b635e61
FC
265 // By default, same behavior as a short request
266 protected void queueLongRequest(final ITmfDataRequest<T> request) {
267 queueRequest(request);
268 }
269
8c8bf09f
ASL
270 /**
271 * Format the result data and forwards it to the requester.
272 * Note: after handling, the data is *removed*.
273 *
274 * @param request
275 * @param data
276 */
277 @SuppressWarnings("unchecked")
951d134a 278 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
279 synchronized(request) {
280 if (!request.isCompleted()) {
281 T[] result = (T[]) Array.newInstance(fType, data.size());
282 data.toArray(result);
283 request.setData(result);
284 request.handleData();
285 data.removeAllElements();
286 }
287 }
288 }
289
951d134a
FC
290 /**
291 * Initialize the provider based on the request. The context is
292 * provider specific and will be updated by getNext().
293 *
294 * @param request
295 * @return an application specific context; null if request can't be serviced
296 */
2fb2eb37 297 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
3d62f8b7
FC
298 public abstract T getNext(ITmfContext context);
299
300// public abstract void queueResult(T data);
301
8c8bf09f
ASL
302 /**
303 * Return the next piece of data based on the context supplied. The context
304 * would typically be updated for the subsequent read.
305 *
306 * @param context
307 * @return
308 */
3d62f8b7
FC
309// private static final int TIMEOUT = 10000;
310//// public abstract T getNext(ITmfContext context) throws InterruptedException;
311//// private int getLevel = 0;
312// public T getNext(ITmfContext context) throws InterruptedException {
313//// String name = Thread.currentThread().getName(); getLevel++;
314//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
315// T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
316// if (data == null) {
317//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
318// throw new InterruptedException();
319// }
320//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
321//// getLevel--;
322// return data;
323// }
324//
325// /**
326// * Makes the generated result data available for getNext()
327// *
328// * @param data
329// */
330//// public abstract void queueResult(T data) throws InterruptedException;
331//// private int putLevel = 0;
332// public void queueResult(T data) throws InterruptedException {
333//// String name = Thread.currentThread().getName(); putLevel++;
334//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
335// boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
336// if (!ok) {
337//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
338// throw new InterruptedException();
339// }
340//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
341//// putLevel--;
342// }
8c8bf09f
ASL
343
344 /**
345 * Checks if the data meets the request completion criteria.
346 *
347 * @param request
348 * @param data
349 * @return
350 */
2fb2eb37 351 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 352 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
353 }
354
951d134a
FC
355 // ------------------------------------------------------------------------
356 // Signal handlers
357 // ------------------------------------------------------------------------
358
8c8bf09f 359 @TmfSignalHandler
550d787e
FC
360 public synchronized void startSynch(TmfStartSynchSignal signal) {
361 fSignalDepth++;
8c8bf09f
ASL
362 }
363
364 @TmfSignalHandler
550d787e
FC
365 public synchronized void endSynch(TmfEndSynchSignal signal) {
366 fSignalDepth--;
367 if (fSignalDepth == 0) {
368 fireRequests();
951d134a 369 }
8c8bf09f
ASL
370 }
371
372}
This page took 0.045983 seconds and 5 git commands to generate.