Added TMF statistics feature (Bug 360572)
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
ce785d7d 20import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 21import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 22import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 24import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
25import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
948b0607
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
951d134a 45public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 46
948b0607
FC
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
550d787e 50
9b635e61 51// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e 52// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
550d787e 53
948b0607
FC
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
8c8bf09f 57
948b0607
FC
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
f6b14ce2 61
948b0607
FC
62 public static final int DEFAULT_BLOCK_SIZE = 50000;
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
8c8bf09f 64
948b0607
FC
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
045df77d 70 private final Object fLock = new Object();
951d134a 71
c1c69938
FC
72 private int fRequestPendingCounter = 0;
73
948b0607
FC
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
78 public TmfDataProvider(String name, Class<T> type) {
79 this(name, type, DEFAULT_QUEUE_SIZE);
80 }
81
82 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
83 super(name);
84 fType = type;
85 fQueueSize = queueSize;
9b635e61 86 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
87
88 fExecutor = new TmfRequestExecutor();
948b0607 89 fSignalDepth = 0;
550d787e 90
948b0607
FC
91 fLogData = Tracer.isEventTraced();
92 fLogError = Tracer.isErrorTraced();
54d55ced 93
948b0607 94 TmfProviderManager.register(fType, this);
9b635e61 95// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
948b0607
FC
96 }
97
98 public TmfDataProvider(TmfDataProvider<T> other) {
ce785d7d
FC
99 super(other);
100 fType = other.fType;
101 fQueueSize = other.fQueueSize;
9b635e61 102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
103
104 fExecutor = new TmfRequestExecutor();
550d787e
FC
105 fSignalDepth = 0;
106
948b0607 107 fLogData = Tracer.isEventTraced();
cb866e08 108 fLogError = Tracer.isErrorTraced();
948b0607
FC
109 }
110
111 @Override
112 public void dispose() {
113 TmfProviderManager.deregister(fType, this);
114 fExecutor.stop();
115 super.dispose();
3d62f8b7 116// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
117 }
118
119 public int getQueueSize() {
120 return fQueueSize;
121 }
122
123 public Class<?> getType() {
124 return fType;
125 }
126
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
130
131 @Override
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized (fLock) {
134 if (fSignalDepth > 0) {
135 coalesceDataRequest(request);
136 } else {
137 dispatchRequest(request);
138 }
139 }
140 }
141
142 /**
143 * This method queues the coalesced requests.
144 *
145 * @param thread
146 */
147 @Override
c1c69938 148 public void fireRequest() {
948b0607
FC
149 synchronized (fLock) {
150 if (fRequestPendingCounter > 0) {
151 return;
152 }
153 if (fPendingCoalescedRequests.size() > 0) {
154 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
155 dispatchRequest(request);
156 }
157 fPendingCoalescedRequests.clear();
158 }
159 }
160 }
161
162 /**
163 * Increments/decrements the pending requests counters and fires the request
164 * if necessary (counter == 0). Used for coalescing requests accross
165 * multiple TmfDataProvider.
166 *
167 * @param isIncrement
168 */
c1c69938
FC
169 @Override
170 public void notifyPendingRequest(boolean isIncrement) {
948b0607 171 synchronized (fLock) {
c1c69938
FC
172 if (isIncrement) {
173 if (fSignalDepth > 0) {
174 fRequestPendingCounter++;
175 }
176 } else {
177 if (fRequestPendingCounter > 0) {
178 fRequestPendingCounter--;
179 }
948b0607 180
c1c69938
FC
181 // fire request if all pending requests are received
182 if (fRequestPendingCounter == 0) {
183 fireRequest();
184 }
185 }
186 }
948b0607
FC
187 }
188
189 // ------------------------------------------------------------------------
190 // Coalescing (primitive test...)
191 // ------------------------------------------------------------------------
192
193 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
194
195 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
196 synchronized (fLock) {
197 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(),
198 request.getBlockSize(), request.getExecType());
199 coalescedRequest.addRequest(request);
200 if (Tracer.isRequestTraced()) {
201 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
202 }
203 fPendingCoalescedRequests.add(coalescedRequest);
204 }
205 }
206
207 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
208 synchronized (fLock) {
209 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
210 if (coalescedRequest.isCompatible(request)) {
211 coalescedRequest.addRequest(request);
212 if (Tracer.isRequestTraced()) {
213 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
214 }
215 return;
216 }
217 }
218 newCoalescedDataRequest(request);
219 }
220 }
221
222 // ------------------------------------------------------------------------
223 // Request processing
224 // ------------------------------------------------------------------------
225
226 private void dispatchRequest(final ITmfDataRequest<T> request) {
227 if (request.getExecType() == ExecutionType.FOREGROUND)
228 queueRequest(request);
229 else
230 queueBackgroundRequest(request, request.getBlockSize(), true);
231 }
232
233 protected void queueRequest(final ITmfDataRequest<T> request) {
234
235 if (fExecutor.isShutdown()) {
236 request.cancel();
237 return;
238 }
239
240 final TmfDataProvider<T> provider = this;
241
242 // Process the request
243 TmfThread thread = new TmfThread(request.getExecType()) {
244
245 @Override
246 public void run() {
247
248 if (Tracer.isRequestTraced())
249 Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName()); //$NON-NLS-1$//$NON-NLS-2$
250
251 // Extract the generic information
252 request.start();
253 int nbRequested = request.getNbRequested();
254 int nbRead = 0;
255
256 // Initialize the execution
257 ITmfContext context = armRequest(request);
258 if (context == null) {
259 request.cancel();
260 return;
261 }
262
263 try {
264 // Get the ordered events
265 T data = getNext(context);
266 if (Tracer.isRequestTraced())
267 Tracer.trace("Request #" + request.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
268 while (data != null && !isCompleted(request, data, nbRead)) {
269 if (fLogData)
270 Tracer.traceEvent(provider, request, data);
271 request.handleData(data);
272
273 // To avoid an unnecessary read passed the last data
274 // requested
275 if (++nbRead < nbRequested) {
276 data = getNext(context);
277 }
278 }
279 if (Tracer.isRequestTraced())
280 Tracer.trace("Request #" + request.getRequestId() + " finished"); //$NON-NLS-1$//$NON-NLS-2$
281
282 if (request.isCancelled()) {
283 request.cancel();
284 } else {
285 request.done();
286 }
287 } catch (Exception e) {
288 request.fail();
289 }
290
291 // Cleanup
292 context.dispose();
293 }
294 };
295
296 if (Tracer.isRequestTraced())
297 Tracer.traceRequest(request, "queued"); //$NON-NLS-1$
298 fExecutor.execute(thread);
299
300 }
301
302 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
303
304 Thread thread = new Thread() {
305 @Override
306 public void run() {
307 request.start();
308
309 final Integer[] CHUNK_SIZE = new Integer[1];
310 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
311
312 final Integer[] nbRead = new Integer[1];
313 nbRead[0] = 0;
314
315 final Boolean[] isFinished = new Boolean[1];
316 isFinished[0] = Boolean.FALSE;
317
318 while (!isFinished[0]) {
319
320 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex() + nbRead[0], CHUNK_SIZE[0],
321 blockSize, ExecutionType.BACKGROUND) {
322 @Override
323 public void handleData(T data) {
324 super.handleData(data);
325 request.handleData(data);
326 if (getNbRead() > CHUNK_SIZE[0]) {
327 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
328 }
329 }
330
331 @Override
332 public void handleCompleted() {
333 nbRead[0] += getNbRead();
334 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 335 if (this.isCancelled()) {
948b0607 336 request.cancel();
90de83da
BH
337 } else if (this.isFailed()) {
338 request.fail();
948b0607
FC
339 } else {
340 request.done();
341 }
342 isFinished[0] = Boolean.TRUE;
343 }
344 super.handleCompleted();
345 }
346 };
347
348 if (!isFinished[0]) {
349 queueRequest(subRequest);
350
351 try {
352 subRequest.waitForCompletion();
353 } catch (InterruptedException e) {
354 e.printStackTrace();
355 }
356
357 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
358 }
359 }
360 }
361 };
362
363 thread.start();
364 }
365
366 /**
367 * Initialize the provider based on the request. The context is provider
368 * specific and will be updated by getNext().
369 *
370 * @param request
371 * @return an application specific context; null if request can't be
372 * serviced
373 */
374 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
375
376 public abstract T getNext(ITmfContext context);
377
378 /**
379 * Checks if the data meets the request completion criteria.
380 *
381 * @param request
382 * @param data
383 * @return
384 */
385 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
386 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
387 }
388
389 // ------------------------------------------------------------------------
390 // Signal handlers
391 // ------------------------------------------------------------------------
392
393 @TmfSignalHandler
394 public void startSynch(TmfStartSynchSignal signal) {
395 synchronized (fLock) {
396 fSignalDepth++;
397 }
398 }
399
400 @TmfSignalHandler
401 public void endSynch(TmfEndSynchSignal signal) {
045df77d 402 synchronized (fLock) {
948b0607
FC
403 fSignalDepth--;
404 if (fSignalDepth == 0) {
405 fireRequest();
406 }
045df77d 407 }
948b0607 408 }
8c8bf09f
ASL
409
410}
This page took 0.049696 seconds and 5 git commands to generate.