Replace location by context in checkpoint indexer
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
8fd82db5
FC
44 *
45 * @version 1.0
46 * @author Francois Chouinard
8c8bf09f 47 */
72f1e62a 48public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
00641a97
FC
54 public static final int DEFAULT_BLOCK_SIZE = 50000;
55 public static final int DEFAULT_QUEUE_SIZE = 1000;
56
948b0607 57 // ------------------------------------------------------------------------
12c155f5 58 // Attributes
948b0607 59 // ------------------------------------------------------------------------
8c8bf09f 60
12c155f5
FC
61 protected Class<T> fType;
62 protected boolean fLogData;
63 protected boolean fLogError;
f6b14ce2 64
00641a97 65 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
12c155f5
FC
66 protected BlockingQueue<T> fDataQueue;
67 protected TmfRequestExecutor fExecutor;
948b0607
FC
68
69 private int fSignalDepth = 0;
045df77d 70 private final Object fLock = new Object();
951d134a 71
c1c69938
FC
72 private int fRequestPendingCounter = 0;
73
948b0607
FC
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
12c155f5 78 public TmfDataProvider() {
00641a97 79 super();
12c155f5
FC
80 fQueueSize = DEFAULT_QUEUE_SIZE;
81 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
82 fExecutor = new TmfRequestExecutor();
83 }
84
3791b5df 85 public void init(String name, Class<T> type) {
12c155f5 86 super.init(name);
3791b5df 87 fType = type;
12c155f5
FC
88 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
89
90 fExecutor = new TmfRequestExecutor();
91 fSignalDepth = 0;
92
93 fLogData = Tracer.isEventTraced();
94 fLogError = Tracer.isErrorTraced();
95
96 TmfProviderManager.register(fType, this);
97 }
98
948b0607 99 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
00641a97 100 this();
948b0607 101 fQueueSize = queueSize;
00641a97 102 init(name, type);
948b0607
FC
103 }
104
105 public TmfDataProvider(TmfDataProvider<T> other) {
00641a97
FC
106 this();
107 init(other.getName(), other.fType);
108 }
550d787e 109
00641a97
FC
110 public TmfDataProvider(String name, Class<T> type) {
111 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
112 }
113
114 @Override
115 public void dispose() {
116 TmfProviderManager.deregister(fType, this);
117 fExecutor.stop();
118 super.dispose();
12c155f5 119 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
120 }
121
00641a97
FC
122 // ------------------------------------------------------------------------
123 // Accessors
124 // ------------------------------------------------------------------------
125
948b0607
FC
126 public int getQueueSize() {
127 return fQueueSize;
128 }
129
130 public Class<?> getType() {
131 return fType;
132 }
133
134 // ------------------------------------------------------------------------
135 // ITmfRequestHandler
136 // ------------------------------------------------------------------------
137
138 @Override
139 public void sendRequest(final ITmfDataRequest<T> request) {
140 synchronized (fLock) {
141 if (fSignalDepth > 0) {
142 coalesceDataRequest(request);
143 } else {
144 dispatchRequest(request);
145 }
146 }
147 }
148
149 /**
150 * This method queues the coalesced requests.
948b0607
FC
151 */
152 @Override
c1c69938 153 public void fireRequest() {
948b0607
FC
154 synchronized (fLock) {
155 if (fRequestPendingCounter > 0) {
156 return;
157 }
158 if (fPendingCoalescedRequests.size() > 0) {
159 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
160 dispatchRequest(request);
161 }
162 fPendingCoalescedRequests.clear();
163 }
164 }
165 }
166
167 /**
12c155f5
FC
168 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
169 * coalescing requests accross multiple TmfDataProvider.
948b0607
FC
170 *
171 * @param isIncrement
172 */
c1c69938
FC
173 @Override
174 public void notifyPendingRequest(boolean isIncrement) {
948b0607 175 synchronized (fLock) {
c1c69938
FC
176 if (isIncrement) {
177 if (fSignalDepth > 0) {
178 fRequestPendingCounter++;
179 }
180 } else {
181 if (fRequestPendingCounter > 0) {
182 fRequestPendingCounter--;
183 }
948b0607 184
c1c69938
FC
185 // fire request if all pending requests are received
186 if (fRequestPendingCounter == 0) {
187 fireRequest();
188 }
189 }
190 }
948b0607
FC
191 }
192
193 // ------------------------------------------------------------------------
194 // Coalescing (primitive test...)
195 // ------------------------------------------------------------------------
196
197 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
198
199 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
200 synchronized (fLock) {
1b70b6dc 201 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
12c155f5 202 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
203 coalescedRequest.addRequest(request);
204 if (Tracer.isRequestTraced()) {
90891c08
FC
205 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
206 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
207 }
208 fPendingCoalescedRequests.add(coalescedRequest);
209 }
210 }
211
212 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
213 synchronized (fLock) {
214 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
215 if (coalescedRequest.isCompatible(request)) {
216 coalescedRequest.addRequest(request);
217 if (Tracer.isRequestTraced()) {
90891c08
FC
218 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
219 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
220 }
221 return;
222 }
223 }
224 newCoalescedDataRequest(request);
225 }
226 }
227
228 // ------------------------------------------------------------------------
229 // Request processing
230 // ------------------------------------------------------------------------
231
232 private void dispatchRequest(final ITmfDataRequest<T> request) {
233 if (request.getExecType() == ExecutionType.FOREGROUND)
234 queueRequest(request);
235 else
236 queueBackgroundRequest(request, request.getBlockSize(), true);
237 }
238
239 protected void queueRequest(final ITmfDataRequest<T> request) {
240
241 if (fExecutor.isShutdown()) {
242 request.cancel();
243 return;
244 }
245
246 final TmfDataProvider<T> provider = this;
247
248 // Process the request
249 TmfThread thread = new TmfThread(request.getExecType()) {
475743b7 250
948b0607
FC
251 @Override
252 public void run() {
253
4cf201de 254 if (Tracer.isRequestTraced()) {
90891c08 255 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 256 }
948b0607
FC
257
258 // Extract the generic information
259 request.start();
260 int nbRequested = request.getNbRequested();
261 int nbRead = 0;
262
263 // Initialize the execution
264 ITmfContext context = armRequest(request);
265 if (context == null) {
266 request.cancel();
267 return;
268 }
269
270 try {
271 // Get the ordered events
272 T data = getNext(context);
273 if (Tracer.isRequestTraced())
90891c08 274 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
948b0607 275 while (data != null && !isCompleted(request, data, nbRead)) {
408e65d2
FC
276 if (fLogData) {
277 Tracer.traceEvent(provider, request, data);
278 }
1b70b6dc
PT
279 if (request.getDataType().isInstance(data)) {
280 request.handleData(data);
281 }
948b0607
FC
282
283 // To avoid an unnecessary read passed the last data
284 // requested
285 if (++nbRead < nbRequested) {
286 data = getNext(context);
287 }
288 }
289 if (Tracer.isRequestTraced())
90891c08 290 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
948b0607
FC
291
292 if (request.isCancelled()) {
293 request.cancel();
294 } else {
295 request.done();
296 }
297 } catch (Exception e) {
298 request.fail();
299 }
300
301 // Cleanup
302 context.dispose();
303 }
475743b7
FC
304
305 @Override
306 public void cancel() {
8a0edc79
FC
307 if (!request.isCompleted()) {
308 request.cancel();
475743b7
FC
309 }
310 }
948b0607
FC
311 };
312
313 if (Tracer.isRequestTraced())
90891c08 314 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
948b0607
FC
315 fExecutor.execute(thread);
316
317 }
318
319 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
320
4cf201de
FC
321 final TmfDataProvider<T> provider = this;
322
948b0607
FC
323 Thread thread = new Thread() {
324 @Override
325 public void run() {
4cf201de
FC
326
327 if (Tracer.isRequestTraced()) {
328 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
329 }
330
948b0607
FC
331 request.start();
332
333 final Integer[] CHUNK_SIZE = new Integer[1];
334 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
335
336 final Integer[] nbRead = new Integer[1];
337 nbRead[0] = 0;
338
339 final Boolean[] isFinished = new Boolean[1];
340 isFinished[0] = Boolean.FALSE;
341
342 while (!isFinished[0]) {
343
12c155f5
FC
344 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
345 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
948b0607
FC
346 @Override
347 public void handleData(T data) {
348 super.handleData(data);
1b70b6dc
PT
349 if (request.getDataType().isInstance(data)) {
350 request.handleData(data);
351 }
948b0607
FC
352 if (getNbRead() > CHUNK_SIZE[0]) {
353 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
354 }
355 }
356
357 @Override
358 public void handleCompleted() {
359 nbRead[0] += getNbRead();
360 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 361 if (this.isCancelled()) {
948b0607 362 request.cancel();
90de83da 363 } else if (this.isFailed()) {
12c155f5 364 request.fail();
948b0607
FC
365 } else {
366 request.done();
367 }
368 isFinished[0] = Boolean.TRUE;
369 }
370 super.handleCompleted();
371 }
372 };
373
374 if (!isFinished[0]) {
375 queueRequest(subRequest);
376
377 try {
378 subRequest.waitForCompletion();
379 } catch (InterruptedException e) {
380 e.printStackTrace();
381 }
382
383 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
384 }
385 }
386 }
387 };
388
389 thread.start();
390 }
391
392 /**
d337369a
FC
393 * Initialize the provider based on the request. The context is provider
394 * specific and will be updated by getNext().
948b0607
FC
395 *
396 * @param request
12c155f5 397 * @return an application specific context; null if request can't be serviced
948b0607 398 */
9e0640dc 399 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
948b0607 400
c32744d6
FC
401// /**
402// * Return the next event based on the context supplied. The context
403// * will be updated for the subsequent read.
404// *
405// * @param context the trace read context (updated)
406// * @return the event referred to by context
407// */
408// public abstract T getNext(ITmfContext context);
948b0607
FC
409
410 /**
411 * Checks if the data meets the request completion criteria.
412 *
0d9a6d76
FC
413 * @param request the request
414 * @param data the data to verify
415 * @param nbRead the number of events read so far
416 * @return true if completion criteria is met
948b0607
FC
417 */
418 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
b6be1c3e 419 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
420 }
421
422 // ------------------------------------------------------------------------
423 // Signal handlers
424 // ------------------------------------------------------------------------
425
426 @TmfSignalHandler
427 public void startSynch(TmfStartSynchSignal signal) {
428 synchronized (fLock) {
429 fSignalDepth++;
430 }
431 }
432
433 @TmfSignalHandler
434 public void endSynch(TmfEndSynchSignal signal) {
045df77d 435 synchronized (fLock) {
948b0607
FC
436 fSignalDepth--;
437 if (fSignalDepth == 0) {
438 fireRequest();
439 }
045df77d 440 }
948b0607 441 }
8c8bf09f
ASL
442
443}
This page took 0.062173 seconds and 5 git commands to generate.