Use project-specific Save Actions settings
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
8fd82db5
FC
44 *
45 * @version 1.0
46 * @author Francois Chouinard
8c8bf09f 47 */
72f1e62a 48public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
063f0d27 54 /** Default amount of events per request "chunk" */
00641a97 55 public static final int DEFAULT_BLOCK_SIZE = 50000;
063f0d27
AM
56
57 /** Default size of the queue */
00641a97
FC
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
948b0607 60 // ------------------------------------------------------------------------
12c155f5 61 // Attributes
948b0607 62 // ------------------------------------------------------------------------
8c8bf09f 63
12c155f5
FC
64 protected Class<T> fType;
65 protected boolean fLogData;
66 protected boolean fLogError;
f6b14ce2 67
00641a97 68 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
12c155f5
FC
69 protected BlockingQueue<T> fDataQueue;
70 protected TmfRequestExecutor fExecutor;
948b0607
FC
71
72 private int fSignalDepth = 0;
045df77d 73 private final Object fLock = new Object();
951d134a 74
c1c69938
FC
75 private int fRequestPendingCounter = 0;
76
948b0607
FC
77 // ------------------------------------------------------------------------
78 // Constructors
79 // ------------------------------------------------------------------------
80
063f0d27
AM
81 /**
82 * Default constructor
83 */
12c155f5 84 public TmfDataProvider() {
00641a97 85 super();
12c155f5
FC
86 fQueueSize = DEFAULT_QUEUE_SIZE;
87 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
88 fExecutor = new TmfRequestExecutor();
89 }
90
063f0d27
AM
91 /**
92 * Initialize this data provider
93 *
94 * @param name
95 * Name of the provider
96 * @param type
97 * The type of events that will be handled
98 */
3791b5df 99 public void init(String name, Class<T> type) {
12c155f5 100 super.init(name);
3791b5df 101 fType = type;
12c155f5
FC
102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
103
104 fExecutor = new TmfRequestExecutor();
105 fSignalDepth = 0;
106
107 fLogData = Tracer.isEventTraced();
108 fLogError = Tracer.isErrorTraced();
109
110 TmfProviderManager.register(fType, this);
111 }
112
948b0607 113 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
00641a97 114 this();
948b0607 115 fQueueSize = queueSize;
00641a97 116 init(name, type);
948b0607
FC
117 }
118
063f0d27
AM
119 /**
120 * Copy constructor
121 *
122 * @param other
123 * The other object to copy
124 */
948b0607 125 public TmfDataProvider(TmfDataProvider<T> other) {
00641a97
FC
126 this();
127 init(other.getName(), other.fType);
128 }
550d787e 129
063f0d27
AM
130 /**
131 * Standard constructor. Instantiate and initialize at the same time.
132 *
133 * @param name
134 * Name of the provider
135 * @param type
136 * The type of events that will be handled
137 */
00641a97
FC
138 public TmfDataProvider(String name, Class<T> type) {
139 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
140 }
141
142 @Override
143 public void dispose() {
144 TmfProviderManager.deregister(fType, this);
145 fExecutor.stop();
146 super.dispose();
12c155f5 147 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
148 }
149
00641a97
FC
150 // ------------------------------------------------------------------------
151 // Accessors
152 // ------------------------------------------------------------------------
153
063f0d27
AM
154 /**
155 * Get the queue size of this provider
156 *
157 * @return The size of the queue
158 */
948b0607
FC
159 public int getQueueSize() {
160 return fQueueSize;
161 }
162
063f0d27
AM
163 /**
164 * Get the event type this provider handles
165 *
166 * @return The type of ITmfEvent
167 */
948b0607
FC
168 public Class<?> getType() {
169 return fType;
170 }
171
172 // ------------------------------------------------------------------------
173 // ITmfRequestHandler
174 // ------------------------------------------------------------------------
175
176 @Override
177 public void sendRequest(final ITmfDataRequest<T> request) {
178 synchronized (fLock) {
179 if (fSignalDepth > 0) {
180 coalesceDataRequest(request);
181 } else {
182 dispatchRequest(request);
183 }
184 }
185 }
186
948b0607 187 @Override
c1c69938 188 public void fireRequest() {
948b0607
FC
189 synchronized (fLock) {
190 if (fRequestPendingCounter > 0) {
191 return;
192 }
193 if (fPendingCoalescedRequests.size() > 0) {
194 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
195 dispatchRequest(request);
196 }
197 fPendingCoalescedRequests.clear();
198 }
199 }
200 }
201
202 /**
063f0d27
AM
203 * Increments/decrements the pending requests counters and fires the request
204 * if necessary (counter == 0). Used for coalescing requests accross
205 * multiple TmfDataProvider.
206 *
948b0607
FC
207 * @param isIncrement
208 */
c1c69938
FC
209 @Override
210 public void notifyPendingRequest(boolean isIncrement) {
948b0607 211 synchronized (fLock) {
c1c69938
FC
212 if (isIncrement) {
213 if (fSignalDepth > 0) {
214 fRequestPendingCounter++;
215 }
216 } else {
217 if (fRequestPendingCounter > 0) {
218 fRequestPendingCounter--;
219 }
948b0607 220
c1c69938
FC
221 // fire request if all pending requests are received
222 if (fRequestPendingCounter == 0) {
223 fireRequest();
224 }
225 }
226 }
948b0607
FC
227 }
228
229 // ------------------------------------------------------------------------
230 // Coalescing (primitive test...)
231 // ------------------------------------------------------------------------
232
233 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
234
235 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
236 synchronized (fLock) {
1b70b6dc 237 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
12c155f5 238 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
239 coalescedRequest.addRequest(request);
240 if (Tracer.isRequestTraced()) {
90891c08
FC
241 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
242 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
243 }
244 fPendingCoalescedRequests.add(coalescedRequest);
245 }
246 }
247
248 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
249 synchronized (fLock) {
250 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
251 if (coalescedRequest.isCompatible(request)) {
252 coalescedRequest.addRequest(request);
253 if (Tracer.isRequestTraced()) {
90891c08
FC
254 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
255 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
256 }
257 return;
258 }
259 }
260 newCoalescedDataRequest(request);
261 }
262 }
263
264 // ------------------------------------------------------------------------
265 // Request processing
266 // ------------------------------------------------------------------------
267
268 private void dispatchRequest(final ITmfDataRequest<T> request) {
269 if (request.getExecType() == ExecutionType.FOREGROUND)
270 queueRequest(request);
271 else
272 queueBackgroundRequest(request, request.getBlockSize(), true);
273 }
274
275 protected void queueRequest(final ITmfDataRequest<T> request) {
276
277 if (fExecutor.isShutdown()) {
278 request.cancel();
279 return;
280 }
281
282 final TmfDataProvider<T> provider = this;
283
284 // Process the request
285 TmfThread thread = new TmfThread(request.getExecType()) {
475743b7 286
948b0607
FC
287 @Override
288 public void run() {
289
4cf201de 290 if (Tracer.isRequestTraced()) {
90891c08 291 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 292 }
948b0607
FC
293
294 // Extract the generic information
295 request.start();
296 int nbRequested = request.getNbRequested();
297 int nbRead = 0;
298
299 // Initialize the execution
300 ITmfContext context = armRequest(request);
301 if (context == null) {
302 request.cancel();
303 return;
304 }
305
306 try {
307 // Get the ordered events
308 T data = getNext(context);
309 if (Tracer.isRequestTraced())
90891c08 310 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
948b0607 311 while (data != null && !isCompleted(request, data, nbRead)) {
408e65d2
FC
312 if (fLogData) {
313 Tracer.traceEvent(provider, request, data);
314 }
1b70b6dc
PT
315 if (request.getDataType().isInstance(data)) {
316 request.handleData(data);
317 }
948b0607
FC
318
319 // To avoid an unnecessary read passed the last data
320 // requested
321 if (++nbRead < nbRequested) {
322 data = getNext(context);
323 }
324 }
325 if (Tracer.isRequestTraced())
90891c08 326 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
948b0607
FC
327
328 if (request.isCancelled()) {
329 request.cancel();
330 } else {
331 request.done();
332 }
333 } catch (Exception e) {
334 request.fail();
335 }
336
337 // Cleanup
338 context.dispose();
339 }
475743b7
FC
340
341 @Override
342 public void cancel() {
8a0edc79
FC
343 if (!request.isCompleted()) {
344 request.cancel();
475743b7
FC
345 }
346 }
948b0607
FC
347 };
348
349 if (Tracer.isRequestTraced())
90891c08 350 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
948b0607
FC
351 fExecutor.execute(thread);
352
353 }
354
355 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
356
4cf201de
FC
357 final TmfDataProvider<T> provider = this;
358
948b0607
FC
359 Thread thread = new Thread() {
360 @Override
361 public void run() {
4cf201de
FC
362
363 if (Tracer.isRequestTraced()) {
364 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
365 }
366
948b0607
FC
367 request.start();
368
369 final Integer[] CHUNK_SIZE = new Integer[1];
370 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
371
372 final Integer[] nbRead = new Integer[1];
373 nbRead[0] = 0;
374
375 final Boolean[] isFinished = new Boolean[1];
376 isFinished[0] = Boolean.FALSE;
377
378 while (!isFinished[0]) {
379
12c155f5
FC
380 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
381 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
11a2fdf0
PT
382
383 @Override
384 public synchronized boolean isCompleted() {
385 return super.isCompleted() || request.isCompleted();
386 }
387
948b0607
FC
388 @Override
389 public void handleData(T data) {
390 super.handleData(data);
1b70b6dc
PT
391 if (request.getDataType().isInstance(data)) {
392 request.handleData(data);
393 }
948b0607
FC
394 if (getNbRead() > CHUNK_SIZE[0]) {
395 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
396 }
397 }
398
399 @Override
400 public void handleCompleted() {
401 nbRead[0] += getNbRead();
402 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 403 if (this.isCancelled()) {
948b0607 404 request.cancel();
90de83da 405 } else if (this.isFailed()) {
12c155f5 406 request.fail();
948b0607
FC
407 } else {
408 request.done();
409 }
410 isFinished[0] = Boolean.TRUE;
411 }
412 super.handleCompleted();
413 }
414 };
415
416 if (!isFinished[0]) {
417 queueRequest(subRequest);
418
419 try {
420 subRequest.waitForCompletion();
11a2fdf0
PT
421 if (request.isCompleted()) {
422 isFinished[0] = Boolean.TRUE;
423 }
948b0607
FC
424 } catch (InterruptedException e) {
425 e.printStackTrace();
426 }
427
428 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
429 }
430 }
431 }
432 };
433
434 thread.start();
435 }
436
437 /**
d337369a
FC
438 * Initialize the provider based on the request. The context is provider
439 * specific and will be updated by getNext().
948b0607
FC
440 *
441 * @param request
12c155f5 442 * @return an application specific context; null if request can't be serviced
948b0607 443 */
9e0640dc 444 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
948b0607 445
c32744d6
FC
446// /**
447// * Return the next event based on the context supplied. The context
448// * will be updated for the subsequent read.
449// *
450// * @param context the trace read context (updated)
451// * @return the event referred to by context
452// */
453// public abstract T getNext(ITmfContext context);
948b0607
FC
454
455 /**
456 * Checks if the data meets the request completion criteria.
457 *
0d9a6d76
FC
458 * @param request the request
459 * @param data the data to verify
460 * @param nbRead the number of events read so far
461 * @return true if completion criteria is met
948b0607
FC
462 */
463 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
b6be1c3e 464 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
465 }
466
467 // ------------------------------------------------------------------------
468 // Signal handlers
469 // ------------------------------------------------------------------------
470
063f0d27
AM
471 /**
472 * Handler for the start synch signal
473 *
474 * @param signal
475 * Incoming signal
476 */
948b0607
FC
477 @TmfSignalHandler
478 public void startSynch(TmfStartSynchSignal signal) {
479 synchronized (fLock) {
480 fSignalDepth++;
481 }
482 }
483
063f0d27
AM
484 /**
485 * Handler for the end synch signal
486 *
487 * @param signal
488 * Incoming signal
489 */
948b0607
FC
490 @TmfSignalHandler
491 public void endSynch(TmfEndSynchSignal signal) {
045df77d 492 synchronized (fLock) {
948b0607
FC
493 fSignalDepth--;
494 if (fSignalDepth == 0) {
495 fireRequest();
496 }
045df77d 497 }
948b0607 498 }
8c8bf09f
ASL
499
500}
This page took 0.062541 seconds and 5 git commands to generate.