Replace location by context in checkpoint indexer
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.internal.tmf.core.Tracer;
21 import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22 import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23 import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24 import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
25 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
26 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
27 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
28 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
29 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31 import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
33
34 /**
35 * An abstract base class that implements ITmfDataProvider.
36 * <p>
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 *
45 * @version 1.0
46 * @author Francois Chouinard
47 */
48 public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
49
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
53
54 public static final int DEFAULT_BLOCK_SIZE = 50000;
55 public static final int DEFAULT_QUEUE_SIZE = 1000;
56
57 // ------------------------------------------------------------------------
58 // Attributes
59 // ------------------------------------------------------------------------
60
61 protected Class<T> fType;
62 protected boolean fLogData;
63 protected boolean fLogError;
64
65 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
66 protected BlockingQueue<T> fDataQueue;
67 protected TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
70 private final Object fLock = new Object();
71
72 private int fRequestPendingCounter = 0;
73
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
78 public TmfDataProvider() {
79 super();
80 fQueueSize = DEFAULT_QUEUE_SIZE;
81 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
82 fExecutor = new TmfRequestExecutor();
83 }
84
85 public void init(String name, Class<T> type) {
86 super.init(name);
87 fType = type;
88 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
89
90 fExecutor = new TmfRequestExecutor();
91 fSignalDepth = 0;
92
93 fLogData = Tracer.isEventTraced();
94 fLogError = Tracer.isErrorTraced();
95
96 TmfProviderManager.register(fType, this);
97 }
98
99 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
100 this();
101 fQueueSize = queueSize;
102 init(name, type);
103 }
104
105 public TmfDataProvider(TmfDataProvider<T> other) {
106 this();
107 init(other.getName(), other.fType);
108 }
109
110 public TmfDataProvider(String name, Class<T> type) {
111 this(name, type, DEFAULT_QUEUE_SIZE);
112 }
113
114 @Override
115 public void dispose() {
116 TmfProviderManager.deregister(fType, this);
117 fExecutor.stop();
118 super.dispose();
119 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
120 }
121
122 // ------------------------------------------------------------------------
123 // Accessors
124 // ------------------------------------------------------------------------
125
126 public int getQueueSize() {
127 return fQueueSize;
128 }
129
130 public Class<?> getType() {
131 return fType;
132 }
133
134 // ------------------------------------------------------------------------
135 // ITmfRequestHandler
136 // ------------------------------------------------------------------------
137
138 @Override
139 public void sendRequest(final ITmfDataRequest<T> request) {
140 synchronized (fLock) {
141 if (fSignalDepth > 0) {
142 coalesceDataRequest(request);
143 } else {
144 dispatchRequest(request);
145 }
146 }
147 }
148
149 /**
150 * This method queues the coalesced requests.
151 */
152 @Override
153 public void fireRequest() {
154 synchronized (fLock) {
155 if (fRequestPendingCounter > 0) {
156 return;
157 }
158 if (fPendingCoalescedRequests.size() > 0) {
159 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
160 dispatchRequest(request);
161 }
162 fPendingCoalescedRequests.clear();
163 }
164 }
165 }
166
167 /**
168 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
169 * coalescing requests accross multiple TmfDataProvider.
170 *
171 * @param isIncrement
172 */
173 @Override
174 public void notifyPendingRequest(boolean isIncrement) {
175 synchronized (fLock) {
176 if (isIncrement) {
177 if (fSignalDepth > 0) {
178 fRequestPendingCounter++;
179 }
180 } else {
181 if (fRequestPendingCounter > 0) {
182 fRequestPendingCounter--;
183 }
184
185 // fire request if all pending requests are received
186 if (fRequestPendingCounter == 0) {
187 fireRequest();
188 }
189 }
190 }
191 }
192
193 // ------------------------------------------------------------------------
194 // Coalescing (primitive test...)
195 // ------------------------------------------------------------------------
196
197 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
198
199 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
200 synchronized (fLock) {
201 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
202 request.getNbRequested(), request.getBlockSize(), request.getExecType());
203 coalescedRequest.addRequest(request);
204 if (Tracer.isRequestTraced()) {
205 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
206 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
207 }
208 fPendingCoalescedRequests.add(coalescedRequest);
209 }
210 }
211
212 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
213 synchronized (fLock) {
214 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
215 if (coalescedRequest.isCompatible(request)) {
216 coalescedRequest.addRequest(request);
217 if (Tracer.isRequestTraced()) {
218 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
219 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
220 }
221 return;
222 }
223 }
224 newCoalescedDataRequest(request);
225 }
226 }
227
228 // ------------------------------------------------------------------------
229 // Request processing
230 // ------------------------------------------------------------------------
231
232 private void dispatchRequest(final ITmfDataRequest<T> request) {
233 if (request.getExecType() == ExecutionType.FOREGROUND)
234 queueRequest(request);
235 else
236 queueBackgroundRequest(request, request.getBlockSize(), true);
237 }
238
239 protected void queueRequest(final ITmfDataRequest<T> request) {
240
241 if (fExecutor.isShutdown()) {
242 request.cancel();
243 return;
244 }
245
246 final TmfDataProvider<T> provider = this;
247
248 // Process the request
249 TmfThread thread = new TmfThread(request.getExecType()) {
250
251 @Override
252 public void run() {
253
254 if (Tracer.isRequestTraced()) {
255 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
256 }
257
258 // Extract the generic information
259 request.start();
260 int nbRequested = request.getNbRequested();
261 int nbRead = 0;
262
263 // Initialize the execution
264 ITmfContext context = armRequest(request);
265 if (context == null) {
266 request.cancel();
267 return;
268 }
269
270 try {
271 // Get the ordered events
272 T data = getNext(context);
273 if (Tracer.isRequestTraced())
274 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
275 while (data != null && !isCompleted(request, data, nbRead)) {
276 if (fLogData) {
277 Tracer.traceEvent(provider, request, data);
278 }
279 if (request.getDataType().isInstance(data)) {
280 request.handleData(data);
281 }
282
283 // To avoid an unnecessary read passed the last data
284 // requested
285 if (++nbRead < nbRequested) {
286 data = getNext(context);
287 }
288 }
289 if (Tracer.isRequestTraced())
290 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
291
292 if (request.isCancelled()) {
293 request.cancel();
294 } else {
295 request.done();
296 }
297 } catch (Exception e) {
298 request.fail();
299 }
300
301 // Cleanup
302 context.dispose();
303 }
304
305 @Override
306 public void cancel() {
307 if (!request.isCompleted()) {
308 request.cancel();
309 }
310 }
311 };
312
313 if (Tracer.isRequestTraced())
314 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
315 fExecutor.execute(thread);
316
317 }
318
319 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
320
321 final TmfDataProvider<T> provider = this;
322
323 Thread thread = new Thread() {
324 @Override
325 public void run() {
326
327 if (Tracer.isRequestTraced()) {
328 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
329 }
330
331 request.start();
332
333 final Integer[] CHUNK_SIZE = new Integer[1];
334 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
335
336 final Integer[] nbRead = new Integer[1];
337 nbRead[0] = 0;
338
339 final Boolean[] isFinished = new Boolean[1];
340 isFinished[0] = Boolean.FALSE;
341
342 while (!isFinished[0]) {
343
344 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
345 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
346 @Override
347 public void handleData(T data) {
348 super.handleData(data);
349 if (request.getDataType().isInstance(data)) {
350 request.handleData(data);
351 }
352 if (getNbRead() > CHUNK_SIZE[0]) {
353 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
354 }
355 }
356
357 @Override
358 public void handleCompleted() {
359 nbRead[0] += getNbRead();
360 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
361 if (this.isCancelled()) {
362 request.cancel();
363 } else if (this.isFailed()) {
364 request.fail();
365 } else {
366 request.done();
367 }
368 isFinished[0] = Boolean.TRUE;
369 }
370 super.handleCompleted();
371 }
372 };
373
374 if (!isFinished[0]) {
375 queueRequest(subRequest);
376
377 try {
378 subRequest.waitForCompletion();
379 } catch (InterruptedException e) {
380 e.printStackTrace();
381 }
382
383 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
384 }
385 }
386 }
387 };
388
389 thread.start();
390 }
391
392 /**
393 * Initialize the provider based on the request. The context is provider
394 * specific and will be updated by getNext().
395 *
396 * @param request
397 * @return an application specific context; null if request can't be serviced
398 */
399 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
400
401 // /**
402 // * Return the next event based on the context supplied. The context
403 // * will be updated for the subsequent read.
404 // *
405 // * @param context the trace read context (updated)
406 // * @return the event referred to by context
407 // */
408 // public abstract T getNext(ITmfContext context);
409
410 /**
411 * Checks if the data meets the request completion criteria.
412 *
413 * @param request the request
414 * @param data the data to verify
415 * @param nbRead the number of events read so far
416 * @return true if completion criteria is met
417 */
418 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
419 return request.isCompleted() || nbRead >= request.getNbRequested();
420 }
421
422 // ------------------------------------------------------------------------
423 // Signal handlers
424 // ------------------------------------------------------------------------
425
426 @TmfSignalHandler
427 public void startSynch(TmfStartSynchSignal signal) {
428 synchronized (fLock) {
429 fSignalDepth++;
430 }
431 }
432
433 @TmfSignalHandler
434 public void endSynch(TmfEndSynchSignal signal) {
435 synchronized (fLock) {
436 fSignalDepth--;
437 if (fSignalDepth == 0) {
438 fireRequest();
439 }
440 }
441 }
442
443 }
This page took 0.040244 seconds and 5 git commands to generate.