Bug 315605: [LTTng] Document exact version of liblttvtraceread that works
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
550d787e 19import java.util.concurrent.TimeUnit;
8c8bf09f 20
ce785d7d 21import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 22import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
24import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
25import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
951d134a 45public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 46
550d787e
FC
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
8c8bf09f 58 final protected Class<T> fType;
550d787e 59 final protected boolean fLogData;
cb866e08 60 final protected boolean fLogError;
8c8bf09f
ASL
61
62 public static final int DEFAULT_QUEUE_SIZE = 1000;
63 protected final int fQueueSize;
64 protected final BlockingQueue<T> fDataQueue;
65 protected final TmfRequestExecutor fExecutor;
66
550d787e 67 private int fSignalDepth = 0;
951d134a 68
8c8bf09f 69 // ------------------------------------------------------------------------
951d134a 70 // Constructors
8c8bf09f
ASL
71 // ------------------------------------------------------------------------
72
73 public TmfDataProvider(String name, Class<T> type) {
74 this(name, type, DEFAULT_QUEUE_SIZE);
75 }
76
77 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
78 super(name);
fc6ccf6f 79 fType = type;
ce785d7d 80 fQueueSize = queueSize;
cb866e08
FC
81// fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
82 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
ce785d7d
FC
83
84 fExecutor = new TmfRequestExecutor();
550d787e
FC
85 fSignalDepth = 0;
86
cb866e08
FC
87 fLogData = Tracer.isEventTraced();
88 fLogError = Tracer.isErrorTraced();
54d55ced
FC
89
90 TmfProviderManager.register(fType, this);
550d787e 91 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 92}
377f1ad8 93
ce785d7d
FC
94 public TmfDataProvider(TmfDataProvider<T> other) {
95 super(other);
96 fType = other.fType;
97 fQueueSize = other.fQueueSize;
cb866e08
FC
98// fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
99 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
ce785d7d
FC
100
101 fExecutor = new TmfRequestExecutor();
550d787e
FC
102 fSignalDepth = 0;
103
cb866e08
FC
104 fLogData = Tracer.isEventTraced();
105 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
106 }
107
fc6ccf6f 108 @Override
2fb2eb37 109 public void dispose() {
8c8bf09f 110 TmfProviderManager.deregister(fType, this);
54d55ced 111 fExecutor.stop();
550d787e
FC
112
113 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
114
115 if (fClone != null) fClone.dispose();
2fb2eb37 116 super.dispose();
8c8bf09f
ASL
117 }
118
119 public int getQueueSize() {
120 return fQueueSize;
121 }
122
ff4ed569
FC
123 public Class<?> getType() {
124 return fType;
125 }
126
8c8bf09f
ASL
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
130
550d787e
FC
131 protected TmfDataProvider<T> fClone;
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized(this) {
134 if (fClone == null || request.getExecType() == SHORT) {
135 if (fSignalDepth > 0) {
136 coalesceDataRequest(request);
137 } else {
138 queueRequest(request);
139 }
140 }
141 else {
142 fClone.sendRequest(request);
143 }
951d134a
FC
144 }
145 }
146
147 /**
148 * This method queues the coalesced requests.
149 *
150 * @param thread
151 */
550d787e
FC
152 public void fireRequests() {
153 synchronized(this) {
154 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
155 queueRequest(request);
156 }
157 fPendingCoalescedRequests.clear();
158
159 if (fClone != null)
160 fClone.fireRequests();
951d134a 161 }
951d134a
FC
162 }
163
164 // ------------------------------------------------------------------------
165 // Coalescing (primitive test...)
166 // ------------------------------------------------------------------------
8c8bf09f 167
951d134a 168 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 169
550d787e
FC
170 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
171 synchronized(this) {
172 TmfCoalescedDataRequest<T> coalescedRequest =
cb866e08 173 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
550d787e
FC
174 coalescedRequest.addRequest(request);
175 fPendingCoalescedRequests.add(coalescedRequest);
176 }
951d134a 177 }
8c8bf09f 178
2fb2eb37 179 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
550d787e
FC
180 synchronized(this) {
181 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
182 if (req.isCompatible(request)) {
183 req.addRequest(request);
184 return;
185 }
951d134a 186 }
550d787e 187 newCoalescedDataRequest(request);
8c8bf09f 188 }
8c8bf09f
ASL
189 }
190
951d134a
FC
191 // ------------------------------------------------------------------------
192 // Request processing
193 // ------------------------------------------------------------------------
194
2fb2eb37 195 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 196
cb866e08
FC
197 final ITmfDataProvider<T> provider = this;
198 final ITmfComponent component = this;
550d787e 199
8c8bf09f
ASL
200 // Process the request
201 Thread thread = new Thread() {
202
203 @Override
204 public void run() {
205
206 // Extract the generic information
550d787e 207 request.start();
8c8bf09f
ASL
208 int blockSize = request.getBlockize();
209 int nbRequested = request.getNbRequested();
210
211 // Create the result buffer
212 Vector<T> result = new Vector<T>();
213 int nbRead = 0;
214
215 // Initialize the execution
216 ITmfContext context = armRequest(request);
217 if (context == null) {
550d787e 218 request.cancel();
8c8bf09f
ASL
219 return;
220 }
221
550d787e
FC
222 try {
223 // Get the ordered events
224 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
225 T data = getNext(context);
226 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
227 while (data != null && !isCompleted(request, data, nbRead))
228 {
229 if (fLogData) Tracer.traceEvent(provider, request, data);
230 result.add(data);
231 if (++nbRead % blockSize == 0) {
232 pushData(request, result);
ce785d7d 233 }
550d787e
FC
234 // To avoid an unnecessary read passed the last data requested
235 if (nbRead < nbRequested) {
236 data = getNext(context);
237 if (data == null || data.isNullRef()) {
238 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
239 }
240 }
241 }
242 if (result.size() > 0) {
243 pushData(request, result);
7f407ead 244 }
550d787e
FC
245 request.done();
246 }
247 catch (Exception e) {
550d787e 248 request.fail();
cb866e08 249// e.printStackTrace();
8c8bf09f 250 }
8c8bf09f
ASL
251 }
252 };
5c00c0b7 253 fExecutor.execute(thread);
550d787e 254 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
255 }
256
257 /**
258 * Format the result data and forwards it to the requester.
259 * Note: after handling, the data is *removed*.
260 *
261 * @param request
262 * @param data
263 */
264 @SuppressWarnings("unchecked")
951d134a 265 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
266 synchronized(request) {
267 if (!request.isCompleted()) {
268 T[] result = (T[]) Array.newInstance(fType, data.size());
269 data.toArray(result);
270 request.setData(result);
271 request.handleData();
272 data.removeAllElements();
273 }
274 }
275 }
276
951d134a
FC
277 /**
278 * Initialize the provider based on the request. The context is
279 * provider specific and will be updated by getNext().
280 *
281 * @param request
282 * @return an application specific context; null if request can't be serviced
283 */
2fb2eb37 284 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
951d134a 285
8c8bf09f
ASL
286 /**
287 * Return the next piece of data based on the context supplied. The context
288 * would typically be updated for the subsequent read.
289 *
290 * @param context
291 * @return
292 */
550d787e 293 private static final int TIMEOUT = 1000;
cb866e08
FC
294// public abstract T getNext(ITmfContext context) throws InterruptedException;
295// private int getLevel = 0;
550d787e 296 public T getNext(ITmfContext context) throws InterruptedException {
cb866e08
FC
297// String name = Thread.currentThread().getName(); getLevel++;
298// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
299 T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
300 if (data == null) {
301 if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
550d787e 302 throw new InterruptedException();
8c8bf09f 303 }
cb866e08
FC
304// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
305// getLevel--;
306 return data;
8c8bf09f
ASL
307 }
308
951d134a
FC
309 /**
310 * Makes the generated result data available for getNext()
311 *
312 * @param data
313 */
cb866e08
FC
314// public abstract void queueResult(T data) throws InterruptedException;
315// private int putLevel = 0;
550d787e 316 public void queueResult(T data) throws InterruptedException {
cb866e08
FC
317// String name = Thread.currentThread().getName(); putLevel++;
318// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
550d787e
FC
319 boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
320 if (!ok) {
cb866e08 321 if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
550d787e 322 throw new InterruptedException();
8c8bf09f 323 }
cb866e08
FC
324// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
325// putLevel--;
8c8bf09f
ASL
326 }
327
328 /**
329 * Checks if the data meets the request completion criteria.
330 *
331 * @param request
332 * @param data
333 * @return
334 */
2fb2eb37 335 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 336 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
337 }
338
951d134a
FC
339 // ------------------------------------------------------------------------
340 // Signal handlers
341 // ------------------------------------------------------------------------
342
8c8bf09f 343 @TmfSignalHandler
550d787e
FC
344 public synchronized void startSynch(TmfStartSynchSignal signal) {
345 fSignalDepth++;
8c8bf09f
ASL
346 }
347
348 @TmfSignalHandler
550d787e
FC
349 public synchronized void endSynch(TmfEndSynchSignal signal) {
350 fSignalDepth--;
351 if (fSignalDepth == 0) {
352 fireRequests();
951d134a 353 }
8c8bf09f
ASL
354 }
355
356}
This page took 0.063788 seconds and 5 git commands to generate.