tmf: Generalization of the statistics view
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f
ASL
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
0283f7ff 44 *
8fd82db5
FC
45 * @version 1.0
46 * @author Francois Chouinard
8c8bf09f 47 */
6256d8ad 48public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
063f0d27 54 /** Default amount of events per request "chunk" */
00641a97 55 public static final int DEFAULT_BLOCK_SIZE = 50000;
063f0d27
AM
56
57 /** Default size of the queue */
00641a97
FC
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
948b0607 60 // ------------------------------------------------------------------------
12c155f5 61 // Attributes
948b0607 62 // ------------------------------------------------------------------------
8c8bf09f 63
6256d8ad 64 protected Class<? extends ITmfEvent> fType;
12c155f5
FC
65 protected boolean fLogData;
66 protected boolean fLogError;
f6b14ce2 67
00641a97 68 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
6256d8ad 69 protected BlockingQueue<ITmfEvent> fDataQueue;
12c155f5 70 protected TmfRequestExecutor fExecutor;
948b0607
FC
71
72 private int fSignalDepth = 0;
045df77d 73 private final Object fLock = new Object();
951d134a 74
c1c69938
FC
75 private int fRequestPendingCounter = 0;
76
948b0607
FC
77 // ------------------------------------------------------------------------
78 // Constructors
79 // ------------------------------------------------------------------------
80
063f0d27
AM
81 /**
82 * Default constructor
83 */
12c155f5 84 public TmfDataProvider() {
00641a97 85 super();
12c155f5 86 fQueueSize = DEFAULT_QUEUE_SIZE;
6256d8ad 87 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
12c155f5
FC
88 fExecutor = new TmfRequestExecutor();
89 }
90
063f0d27
AM
91 /**
92 * Initialize this data provider
93 *
94 * @param name
95 * Name of the provider
96 * @param type
97 * The type of events that will be handled
98 */
6256d8ad 99 public void init(String name, Class<? extends ITmfEvent> type) {
12c155f5 100 super.init(name);
3791b5df 101 fType = type;
6256d8ad 102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
12c155f5
FC
103
104 fExecutor = new TmfRequestExecutor();
105 fSignalDepth = 0;
106
107 fLogData = Tracer.isEventTraced();
108 fLogError = Tracer.isErrorTraced();
109
110 TmfProviderManager.register(fType, this);
111 }
112
6256d8ad 113 protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
00641a97 114 this();
948b0607 115 fQueueSize = queueSize;
00641a97 116 init(name, type);
948b0607
FC
117 }
118
063f0d27
AM
119 /**
120 * Copy constructor
121 *
122 * @param other
123 * The other object to copy
124 */
6256d8ad 125 public TmfDataProvider(TmfDataProvider other) {
00641a97
FC
126 this();
127 init(other.getName(), other.fType);
128 }
550d787e 129
063f0d27
AM
130 /**
131 * Standard constructor. Instantiate and initialize at the same time.
132 *
133 * @param name
134 * Name of the provider
135 * @param type
136 * The type of events that will be handled
137 */
6256d8ad 138 public TmfDataProvider(String name, Class<? extends ITmfEvent> type) {
00641a97 139 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
140 }
141
142 @Override
143 public void dispose() {
144 TmfProviderManager.deregister(fType, this);
145 fExecutor.stop();
146 super.dispose();
12c155f5 147 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
148 }
149
00641a97
FC
150 // ------------------------------------------------------------------------
151 // Accessors
152 // ------------------------------------------------------------------------
153
063f0d27
AM
154 /**
155 * Get the queue size of this provider
156 *
157 * @return The size of the queue
158 */
948b0607
FC
159 public int getQueueSize() {
160 return fQueueSize;
161 }
162
063f0d27
AM
163 /**
164 * Get the event type this provider handles
165 *
166 * @return The type of ITmfEvent
167 */
948b0607
FC
168 public Class<?> getType() {
169 return fType;
170 }
171
172 // ------------------------------------------------------------------------
173 // ITmfRequestHandler
174 // ------------------------------------------------------------------------
175
176 @Override
6256d8ad 177 public void sendRequest(final ITmfDataRequest request) {
948b0607
FC
178 synchronized (fLock) {
179 if (fSignalDepth > 0) {
180 coalesceDataRequest(request);
181 } else {
182 dispatchRequest(request);
183 }
184 }
185 }
186
948b0607 187 @Override
c1c69938 188 public void fireRequest() {
948b0607
FC
189 synchronized (fLock) {
190 if (fRequestPendingCounter > 0) {
191 return;
192 }
193 if (fPendingCoalescedRequests.size() > 0) {
6256d8ad 194 for (TmfDataRequest request : fPendingCoalescedRequests) {
948b0607
FC
195 dispatchRequest(request);
196 }
197 fPendingCoalescedRequests.clear();
198 }
199 }
200 }
201
202 /**
063f0d27 203 * Increments/decrements the pending requests counters and fires the request
0283f7ff
FC
204 * if necessary (counter == 0). Used for coalescing requests across multiple
205 * TmfDataProvider's.
063f0d27 206 *
948b0607 207 * @param isIncrement
0283f7ff
FC
208 * Should we increment (true) or decrement (false) the pending
209 * counter
948b0607 210 */
c1c69938
FC
211 @Override
212 public void notifyPendingRequest(boolean isIncrement) {
948b0607 213 synchronized (fLock) {
c1c69938
FC
214 if (isIncrement) {
215 if (fSignalDepth > 0) {
216 fRequestPendingCounter++;
217 }
218 } else {
219 if (fRequestPendingCounter > 0) {
220 fRequestPendingCounter--;
221 }
948b0607 222
c1c69938
FC
223 // fire request if all pending requests are received
224 if (fRequestPendingCounter == 0) {
225 fireRequest();
226 }
227 }
228 }
948b0607
FC
229 }
230
231 // ------------------------------------------------------------------------
232 // Coalescing (primitive test...)
233 // ------------------------------------------------------------------------
234
6256d8ad 235 protected Vector<TmfCoalescedDataRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest>();
948b0607 236
6256d8ad 237 protected void newCoalescedDataRequest(ITmfDataRequest request) {
948b0607 238 synchronized (fLock) {
6256d8ad 239 TmfCoalescedDataRequest coalescedRequest = new TmfCoalescedDataRequest(request.getDataType(), request.getIndex(),
12c155f5 240 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
241 coalescedRequest.addRequest(request);
242 if (Tracer.isRequestTraced()) {
90891c08
FC
243 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
244 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
245 }
246 fPendingCoalescedRequests.add(coalescedRequest);
247 }
248 }
249
6256d8ad 250 protected void coalesceDataRequest(ITmfDataRequest request) {
948b0607 251 synchronized (fLock) {
6256d8ad 252 for (TmfCoalescedDataRequest coalescedRequest : fPendingCoalescedRequests) {
948b0607
FC
253 if (coalescedRequest.isCompatible(request)) {
254 coalescedRequest.addRequest(request);
255 if (Tracer.isRequestTraced()) {
90891c08
FC
256 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
257 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
258 }
259 return;
260 }
261 }
262 newCoalescedDataRequest(request);
263 }
264 }
265
266 // ------------------------------------------------------------------------
267 // Request processing
268 // ------------------------------------------------------------------------
269
6256d8ad 270 private void dispatchRequest(final ITmfDataRequest request) {
0283f7ff 271 if (request.getExecType() == ExecutionType.FOREGROUND) {
948b0607 272 queueRequest(request);
0283f7ff 273 } else {
948b0607 274 queueBackgroundRequest(request, request.getBlockSize(), true);
0283f7ff 275 }
948b0607
FC
276 }
277
6256d8ad 278 protected void queueRequest(final ITmfDataRequest request) {
948b0607
FC
279
280 if (fExecutor.isShutdown()) {
281 request.cancel();
282 return;
283 }
284
6256d8ad 285 final TmfDataProvider provider = this;
948b0607
FC
286
287 // Process the request
288 TmfThread thread = new TmfThread(request.getExecType()) {
0283f7ff 289
948b0607
FC
290 @Override
291 public void run() {
292
4cf201de 293 if (Tracer.isRequestTraced()) {
90891c08 294 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 295 }
948b0607
FC
296
297 // Extract the generic information
298 request.start();
299 int nbRequested = request.getNbRequested();
300 int nbRead = 0;
301
302 // Initialize the execution
303 ITmfContext context = armRequest(request);
304 if (context == null) {
305 request.cancel();
306 return;
307 }
308
309 try {
310 // Get the ordered events
6256d8ad
AM
311 ITmfEvent data = getNext(context);
312 if (Tracer.isRequestTraced()) {
90891c08 313 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
0283f7ff 314 }
948b0607 315 while (data != null && !isCompleted(request, data, nbRead)) {
0283f7ff 316 if (fLogData) {
408e65d2
FC
317 Tracer.traceEvent(provider, request, data);
318 }
1b70b6dc
PT
319 if (request.getDataType().isInstance(data)) {
320 request.handleData(data);
321 }
948b0607
FC
322
323 // To avoid an unnecessary read passed the last data
324 // requested
325 if (++nbRead < nbRequested) {
326 data = getNext(context);
327 }
328 }
6256d8ad 329 if (Tracer.isRequestTraced()) {
90891c08 330 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
0283f7ff 331 }
948b0607
FC
332
333 if (request.isCancelled()) {
334 request.cancel();
335 } else {
336 request.done();
337 }
338 } catch (Exception e) {
339 request.fail();
340 }
341
342 // Cleanup
343 context.dispose();
344 }
475743b7
FC
345
346 @Override
347 public void cancel() {
8a0edc79
FC
348 if (!request.isCompleted()) {
349 request.cancel();
475743b7
FC
350 }
351 }
948b0607
FC
352 };
353
6256d8ad 354 if (Tracer.isRequestTraced()) {
90891c08 355 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
0283f7ff 356 }
948b0607
FC
357 fExecutor.execute(thread);
358
359 }
360
6256d8ad 361 protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) {
948b0607 362
6256d8ad 363 final TmfDataProvider provider = this;
4cf201de 364
948b0607
FC
365 Thread thread = new Thread() {
366 @Override
367 public void run() {
4cf201de
FC
368
369 if (Tracer.isRequestTraced()) {
370 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
371 }
372
948b0607
FC
373 request.start();
374
375 final Integer[] CHUNK_SIZE = new Integer[1];
376 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
377
378 final Integer[] nbRead = new Integer[1];
379 nbRead[0] = 0;
380
381 final Boolean[] isFinished = new Boolean[1];
382 isFinished[0] = Boolean.FALSE;
383
384 while (!isFinished[0]) {
385
6256d8ad 386 TmfDataRequest subRequest = new TmfDataRequest(request.getDataType(), request.getIndex()
12c155f5 387 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
11a2fdf0
PT
388
389 @Override
390 public synchronized boolean isCompleted() {
391 return super.isCompleted() || request.isCompleted();
392 }
393
948b0607 394 @Override
6256d8ad 395 public void handleData(ITmfEvent data) {
948b0607 396 super.handleData(data);
1b70b6dc
PT
397 if (request.getDataType().isInstance(data)) {
398 request.handleData(data);
399 }
948b0607
FC
400 if (getNbRead() > CHUNK_SIZE[0]) {
401 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
402 }
403 }
404
405 @Override
406 public void handleCompleted() {
407 nbRead[0] += getNbRead();
408 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 409 if (this.isCancelled()) {
948b0607 410 request.cancel();
90de83da 411 } else if (this.isFailed()) {
12c155f5 412 request.fail();
948b0607
FC
413 } else {
414 request.done();
415 }
416 isFinished[0] = Boolean.TRUE;
417 }
418 super.handleCompleted();
419 }
420 };
421
422 if (!isFinished[0]) {
423 queueRequest(subRequest);
424
425 try {
426 subRequest.waitForCompletion();
11a2fdf0
PT
427 if (request.isCompleted()) {
428 isFinished[0] = Boolean.TRUE;
429 }
948b0607
FC
430 } catch (InterruptedException e) {
431 e.printStackTrace();
432 }
433
434 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
435 }
436 }
437 }
438 };
439
440 thread.start();
441 }
442
443 /**
d337369a
FC
444 * Initialize the provider based on the request. The context is provider
445 * specific and will be updated by getNext().
0283f7ff 446 *
948b0607 447 * @param request
12c155f5 448 * @return an application specific context; null if request can't be serviced
948b0607 449 */
6256d8ad 450 protected abstract ITmfContext armRequest(ITmfDataRequest request);
948b0607 451
c32744d6
FC
452// /**
453// * Return the next event based on the context supplied. The context
454// * will be updated for the subsequent read.
0283f7ff 455// *
c32744d6
FC
456// * @param context the trace read context (updated)
457// * @return the event referred to by context
458// */
459// public abstract T getNext(ITmfContext context);
948b0607
FC
460
461 /**
462 * Checks if the data meets the request completion criteria.
0283f7ff 463 *
0d9a6d76
FC
464 * @param request the request
465 * @param data the data to verify
466 * @param nbRead the number of events read so far
467 * @return true if completion criteria is met
948b0607 468 */
6256d8ad 469 public boolean isCompleted(ITmfDataRequest request, ITmfEvent data, int nbRead) {
b6be1c3e 470 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
471 }
472
473 // ------------------------------------------------------------------------
474 // Signal handlers
475 // ------------------------------------------------------------------------
476
063f0d27
AM
477 /**
478 * Handler for the start synch signal
479 *
480 * @param signal
481 * Incoming signal
482 */
948b0607
FC
483 @TmfSignalHandler
484 public void startSynch(TmfStartSynchSignal signal) {
485 synchronized (fLock) {
486 fSignalDepth++;
487 }
488 }
489
063f0d27
AM
490 /**
491 * Handler for the end synch signal
492 *
493 * @param signal
494 * Incoming signal
495 */
948b0607
FC
496 @TmfSignalHandler
497 public void endSynch(TmfEndSynchSignal signal) {
045df77d 498 synchronized (fLock) {
948b0607
FC
499 fSignalDepth--;
500 if (fSignalDepth == 0) {
501 fireRequest();
502 }
045df77d 503 }
948b0607 504 }
8c8bf09f
ASL
505
506}
This page took 0.068499 seconds and 5 git commands to generate.