2010-09-15 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug287563
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
9b635e61 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
ce785d7d 21import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 22import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 25import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
26import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33/**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
951d134a 46public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 47
550d787e
FC
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
9b635e61 52// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e
FC
53// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54
55 // ------------------------------------------------------------------------
56 //
57 // ------------------------------------------------------------------------
58
8c8bf09f 59 final protected Class<T> fType;
550d787e 60 final protected boolean fLogData;
cb866e08 61 final protected boolean fLogError;
8c8bf09f 62
f6b14ce2 63 public static final int DEFAULT_BLOCK_SIZE = 5000;
8c8bf09f 64 public static final int DEFAULT_QUEUE_SIZE = 1000;
f6b14ce2 65
8c8bf09f
ASL
66 protected final int fQueueSize;
67 protected final BlockingQueue<T> fDataQueue;
68 protected final TmfRequestExecutor fExecutor;
69
550d787e 70 private int fSignalDepth = 0;
045df77d 71 private final Object fLock = new Object();
951d134a 72
8c8bf09f 73 // ------------------------------------------------------------------------
951d134a 74 // Constructors
8c8bf09f
ASL
75 // ------------------------------------------------------------------------
76
77 public TmfDataProvider(String name, Class<T> type) {
78 this(name, type, DEFAULT_QUEUE_SIZE);
79 }
80
81 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
82 super(name);
fc6ccf6f 83 fType = type;
ce785d7d 84 fQueueSize = queueSize;
9b635e61 85 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
86
87 fExecutor = new TmfRequestExecutor();
550d787e
FC
88 fSignalDepth = 0;
89
cb866e08
FC
90 fLogData = Tracer.isEventTraced();
91 fLogError = Tracer.isErrorTraced();
54d55ced
FC
92
93 TmfProviderManager.register(fType, this);
9b635e61 94// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 95}
377f1ad8 96
ce785d7d
FC
97 public TmfDataProvider(TmfDataProvider<T> other) {
98 super(other);
99 fType = other.fType;
100 fQueueSize = other.fQueueSize;
9b635e61 101 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
102
103 fExecutor = new TmfRequestExecutor();
550d787e
FC
104 fSignalDepth = 0;
105
cb866e08
FC
106 fLogData = Tracer.isEventTraced();
107 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
108 }
109
fc6ccf6f 110 @Override
2fb2eb37 111 public void dispose() {
8c8bf09f 112 TmfProviderManager.deregister(fType, this);
54d55ced 113 fExecutor.stop();
2fb2eb37 114 super.dispose();
3d62f8b7 115// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
8c8bf09f
ASL
116 }
117
118 public int getQueueSize() {
119 return fQueueSize;
120 }
121
ff4ed569
FC
122 public Class<?> getType() {
123 return fType;
124 }
125
8c8bf09f
ASL
126 // ------------------------------------------------------------------------
127 // ITmfRequestHandler
128 // ------------------------------------------------------------------------
129
550d787e 130 public void sendRequest(final ITmfDataRequest<T> request) {
045df77d 131 synchronized(fLock) {
9b635e61
FC
132 if (fSignalDepth > 0) {
133 coalesceDataRequest(request);
134 } else {
135 dispatchRequest(request);
550d787e 136 }
951d134a
FC
137 }
138 }
139
140 /**
141 * This method queues the coalesced requests.
142 *
143 * @param thread
144 */
550d787e 145 public void fireRequests() {
045df77d 146 synchronized(fLock) {
550d787e 147 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
9b635e61 148 dispatchRequest(request);
550d787e
FC
149 }
150 fPendingCoalescedRequests.clear();
951d134a 151 }
951d134a
FC
152 }
153
154 // ------------------------------------------------------------------------
155 // Coalescing (primitive test...)
156 // ------------------------------------------------------------------------
8c8bf09f 157
951d134a 158 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 159
550d787e 160 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
045df77d 161 synchronized(fLock) {
550d787e 162 TmfCoalescedDataRequest<T> coalescedRequest =
cb866e08 163 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
550d787e 164 coalescedRequest.addRequest(request);
9b635e61
FC
165 if (Tracer.isRequestTraced()) {
166 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
167 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
168 }
550d787e
FC
169 fPendingCoalescedRequests.add(coalescedRequest);
170 }
951d134a 171 }
8c8bf09f 172
045df77d
FC
173 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
174 synchronized(fLock) {
550d787e
FC
175 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
176 if (req.isCompatible(request)) {
177 req.addRequest(request);
9b635e61
FC
178 if (Tracer.isRequestTraced()) {
179 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
180 Tracer.traceRequest(req, "added " + request.getRequestId());
181 }
550d787e
FC
182 return;
183 }
951d134a 184 }
550d787e 185 newCoalescedDataRequest(request);
8c8bf09f 186 }
8c8bf09f
ASL
187 }
188
951d134a
FC
189 // ------------------------------------------------------------------------
190 // Request processing
191 // ------------------------------------------------------------------------
192
9b635e61 193 private void dispatchRequest(final ITmfDataRequest<T> request) {
f6b14ce2 194 if (request.getExecType() == ExecutionType.FOREGROUND)
9b635e61
FC
195 queueRequest(request);
196 else
f6b14ce2 197 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
9b635e61
FC
198 }
199
2fb2eb37 200 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 201
045df77d
FC
202 if (fExecutor.isShutdown()) {
203 request.cancel();
204 return;
205 }
206
f6b14ce2 207// final TmfDataProvider<T> provider = this;
550d787e 208
8c8bf09f 209 // Process the request
9b635e61 210 TmfThread thread = new TmfThread(request.getExecType()) {
8c8bf09f
ASL
211
212 @Override
213 public void run() {
214
f6b14ce2 215// if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
9b635e61 216
8c8bf09f 217 // Extract the generic information
550d787e 218 request.start();
8c8bf09f
ASL
219 int blockSize = request.getBlockize();
220 int nbRequested = request.getNbRequested();
221
222 // Create the result buffer
223 Vector<T> result = new Vector<T>();
224 int nbRead = 0;
225
226 // Initialize the execution
227 ITmfContext context = armRequest(request);
228 if (context == null) {
550d787e 229 request.cancel();
8c8bf09f
ASL
230 return;
231 }
232
550d787e
FC
233 try {
234 // Get the ordered events
f6b14ce2 235// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
550d787e 236 T data = getNext(context);
f6b14ce2 237// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
550d787e
FC
238 while (data != null && !isCompleted(request, data, nbRead))
239 {
f6b14ce2 240// if (fLogData) Tracer.traceEvent(provider, request, data);
550d787e
FC
241 result.add(data);
242 if (++nbRead % blockSize == 0) {
243 pushData(request, result);
ce785d7d 244 }
550d787e
FC
245 // To avoid an unnecessary read passed the last data requested
246 if (nbRead < nbRequested) {
247 data = getNext(context);
3d62f8b7
FC
248 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
249 Tracer.trace("Request #" + request.getRequestId() + " end of data");
250 }
550d787e
FC
251 }
252 }
253 if (result.size() > 0) {
254 pushData(request, result);
7f407ead 255 }
550d787e 256 request.done();
9b635e61 257
3d62f8b7 258 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
550d787e
FC
259 }
260 catch (Exception e) {
9b635e61 261 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
550d787e 262 request.fail();
cb866e08 263// e.printStackTrace();
8c8bf09f 264 }
8c8bf09f
ASL
265 }
266 };
3d62f8b7 267
5c00c0b7 268 fExecutor.execute(thread);
3d62f8b7 269
550d787e 270 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
271 }
272
f6b14ce2
FC
273 // By default, same behavior as a foreground request
274 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
9b635e61
FC
275 queueRequest(request);
276 }
277
8c8bf09f
ASL
278 /**
279 * Format the result data and forwards it to the requester.
280 * Note: after handling, the data is *removed*.
281 *
282 * @param request
283 * @param data
284 */
285 @SuppressWarnings("unchecked")
951d134a 286 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
287 synchronized(request) {
288 if (!request.isCompleted()) {
289 T[] result = (T[]) Array.newInstance(fType, data.size());
290 data.toArray(result);
291 request.setData(result);
292 request.handleData();
293 data.removeAllElements();
294 }
295 }
296 }
297
951d134a
FC
298 /**
299 * Initialize the provider based on the request. The context is
300 * provider specific and will be updated by getNext().
301 *
302 * @param request
303 * @return an application specific context; null if request can't be serviced
304 */
2fb2eb37 305 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
3d62f8b7
FC
306 public abstract T getNext(ITmfContext context);
307
308// public abstract void queueResult(T data);
309
8c8bf09f
ASL
310 /**
311 * Return the next piece of data based on the context supplied. The context
312 * would typically be updated for the subsequent read.
313 *
314 * @param context
315 * @return
316 */
3d62f8b7
FC
317// private static final int TIMEOUT = 10000;
318//// public abstract T getNext(ITmfContext context) throws InterruptedException;
319//// private int getLevel = 0;
320// public T getNext(ITmfContext context) throws InterruptedException {
321//// String name = Thread.currentThread().getName(); getLevel++;
322//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
323// T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
324// if (data == null) {
325//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
326// throw new InterruptedException();
327// }
328//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
329//// getLevel--;
330// return data;
331// }
332//
333// /**
334// * Makes the generated result data available for getNext()
335// *
336// * @param data
337// */
338//// public abstract void queueResult(T data) throws InterruptedException;
339//// private int putLevel = 0;
340// public void queueResult(T data) throws InterruptedException {
341//// String name = Thread.currentThread().getName(); putLevel++;
342//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
343// boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
344// if (!ok) {
345//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
346// throw new InterruptedException();
347// }
348//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
349//// putLevel--;
350// }
8c8bf09f
ASL
351
352 /**
353 * Checks if the data meets the request completion criteria.
354 *
355 * @param request
356 * @param data
357 * @return
358 */
2fb2eb37 359 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 360 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
361 }
362
951d134a
FC
363 // ------------------------------------------------------------------------
364 // Signal handlers
365 // ------------------------------------------------------------------------
366
8c8bf09f 367 @TmfSignalHandler
045df77d
FC
368 public void startSynch(TmfStartSynchSignal signal) {
369 synchronized (fLock) {
370 fSignalDepth++;
371 }
8c8bf09f
ASL
372 }
373
374 @TmfSignalHandler
045df77d
FC
375 public void endSynch(TmfEndSynchSignal signal) {
376 synchronized (fLock) {
377 fSignalDepth--;
378 if (fSignalDepth == 0) {
379 fireRequests();
380 }
381 }
8c8bf09f
ASL
382 }
383
384}
This page took 0.047287 seconds and 5 git commands to generate.