Internalize some TMF APIs
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
72f1e62a 21import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 22import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 23import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b
FC
24import org.eclipse.linuxtools.tmf.core.request.TmfCoalescedDataRequest;
25import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.core.request.TmfRequestExecutor;
6c13869b
FC
27import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
12c155f5
FC
37 * This abstract class implements the housekeeking methods to register/ deregister the event provider and to handle
38 * generically the event requests.
8c8bf09f 39 * <p>
12c155f5
FC
40 * The concrete class can either re-implement processRequest() entirely or just implement the hooks (initializeContext()
41 * and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
72f1e62a 45public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 46
948b0607
FC
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
550d787e 50
00641a97
FC
51 public static final int DEFAULT_BLOCK_SIZE = 50000;
52 public static final int DEFAULT_QUEUE_SIZE = 1000;
53
948b0607 54 // ------------------------------------------------------------------------
12c155f5 55 // Attributes
948b0607 56 // ------------------------------------------------------------------------
8c8bf09f 57
12c155f5
FC
58 protected Class<T> fType;
59 protected boolean fLogData;
60 protected boolean fLogError;
f6b14ce2 61
00641a97 62 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
12c155f5
FC
63 protected BlockingQueue<T> fDataQueue;
64 protected TmfRequestExecutor fExecutor;
948b0607
FC
65
66 private int fSignalDepth = 0;
045df77d 67 private final Object fLock = new Object();
951d134a 68
c1c69938
FC
69 private int fRequestPendingCounter = 0;
70
948b0607
FC
71 // ------------------------------------------------------------------------
72 // Constructors
73 // ------------------------------------------------------------------------
74
12c155f5 75 public TmfDataProvider() {
00641a97 76 super();
12c155f5
FC
77 fQueueSize = DEFAULT_QUEUE_SIZE;
78 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
79 fExecutor = new TmfRequestExecutor();
80 }
81
82 public void init(String name, Class<T> dataType) {
83 super.init(name);
84 fType = dataType;
12c155f5
FC
85 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
86
87 fExecutor = new TmfRequestExecutor();
88 fSignalDepth = 0;
89
90 fLogData = Tracer.isEventTraced();
91 fLogError = Tracer.isErrorTraced();
92
93 TmfProviderManager.register(fType, this);
94 }
95
948b0607 96 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
00641a97 97 this();
948b0607 98 fQueueSize = queueSize;
00641a97 99 init(name, type);
948b0607
FC
100 }
101
102 public TmfDataProvider(TmfDataProvider<T> other) {
00641a97
FC
103 this();
104 init(other.getName(), other.fType);
105 }
550d787e 106
00641a97
FC
107 public TmfDataProvider(String name, Class<T> type) {
108 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
109 }
110
111 @Override
112 public void dispose() {
113 TmfProviderManager.deregister(fType, this);
114 fExecutor.stop();
115 super.dispose();
12c155f5 116 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
117 }
118
00641a97
FC
119 // ------------------------------------------------------------------------
120 // Accessors
121 // ------------------------------------------------------------------------
122
948b0607
FC
123 public int getQueueSize() {
124 return fQueueSize;
125 }
126
127 public Class<?> getType() {
128 return fType;
129 }
130
131 // ------------------------------------------------------------------------
132 // ITmfRequestHandler
133 // ------------------------------------------------------------------------
134
135 @Override
136 public void sendRequest(final ITmfDataRequest<T> request) {
137 synchronized (fLock) {
138 if (fSignalDepth > 0) {
139 coalesceDataRequest(request);
140 } else {
141 dispatchRequest(request);
142 }
143 }
144 }
145
146 /**
147 * This method queues the coalesced requests.
948b0607
FC
148 */
149 @Override
c1c69938 150 public void fireRequest() {
948b0607
FC
151 synchronized (fLock) {
152 if (fRequestPendingCounter > 0) {
153 return;
154 }
155 if (fPendingCoalescedRequests.size() > 0) {
156 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
157 dispatchRequest(request);
158 }
159 fPendingCoalescedRequests.clear();
160 }
161 }
162 }
163
164 /**
12c155f5
FC
165 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
166 * coalescing requests accross multiple TmfDataProvider.
948b0607
FC
167 *
168 * @param isIncrement
169 */
c1c69938
FC
170 @Override
171 public void notifyPendingRequest(boolean isIncrement) {
948b0607 172 synchronized (fLock) {
c1c69938
FC
173 if (isIncrement) {
174 if (fSignalDepth > 0) {
175 fRequestPendingCounter++;
176 }
177 } else {
178 if (fRequestPendingCounter > 0) {
179 fRequestPendingCounter--;
180 }
948b0607 181
c1c69938
FC
182 // fire request if all pending requests are received
183 if (fRequestPendingCounter == 0) {
184 fireRequest();
185 }
186 }
187 }
948b0607
FC
188 }
189
190 // ------------------------------------------------------------------------
191 // Coalescing (primitive test...)
192 // ------------------------------------------------------------------------
193
194 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
195
196 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
197 synchronized (fLock) {
1b70b6dc 198 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
12c155f5 199 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
200 coalescedRequest.addRequest(request);
201 if (Tracer.isRequestTraced()) {
202 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
203 }
204 fPendingCoalescedRequests.add(coalescedRequest);
205 }
206 }
207
208 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
209 synchronized (fLock) {
210 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
211 if (coalescedRequest.isCompatible(request)) {
212 coalescedRequest.addRequest(request);
213 if (Tracer.isRequestTraced()) {
214 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
215 }
216 return;
217 }
218 }
219 newCoalescedDataRequest(request);
220 }
221 }
222
223 // ------------------------------------------------------------------------
224 // Request processing
225 // ------------------------------------------------------------------------
226
227 private void dispatchRequest(final ITmfDataRequest<T> request) {
228 if (request.getExecType() == ExecutionType.FOREGROUND)
229 queueRequest(request);
230 else
231 queueBackgroundRequest(request, request.getBlockSize(), true);
232 }
233
234 protected void queueRequest(final ITmfDataRequest<T> request) {
235
236 if (fExecutor.isShutdown()) {
237 request.cancel();
238 return;
239 }
240
241 final TmfDataProvider<T> provider = this;
242
243 // Process the request
244 TmfThread thread = new TmfThread(request.getExecType()) {
475743b7 245
948b0607
FC
246 @Override
247 public void run() {
248
249 if (Tracer.isRequestTraced())
250 Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName()); //$NON-NLS-1$//$NON-NLS-2$
251
252 // Extract the generic information
253 request.start();
254 int nbRequested = request.getNbRequested();
255 int nbRead = 0;
256
257 // Initialize the execution
258 ITmfContext context = armRequest(request);
259 if (context == null) {
260 request.cancel();
261 return;
262 }
263
264 try {
265 // Get the ordered events
266 T data = getNext(context);
267 if (Tracer.isRequestTraced())
268 Tracer.trace("Request #" + request.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
269 while (data != null && !isCompleted(request, data, nbRead)) {
270 if (fLogData)
271 Tracer.traceEvent(provider, request, data);
1b70b6dc
PT
272 if (request.getDataType().isInstance(data)) {
273 request.handleData(data);
274 }
948b0607
FC
275
276 // To avoid an unnecessary read passed the last data
277 // requested
278 if (++nbRead < nbRequested) {
279 data = getNext(context);
280 }
281 }
282 if (Tracer.isRequestTraced())
283 Tracer.trace("Request #" + request.getRequestId() + " finished"); //$NON-NLS-1$//$NON-NLS-2$
284
285 if (request.isCancelled()) {
286 request.cancel();
287 } else {
288 request.done();
289 }
290 } catch (Exception e) {
291 request.fail();
292 }
293
294 // Cleanup
295 context.dispose();
296 }
475743b7
FC
297
298 @Override
299 public void cancel() {
8a0edc79
FC
300 if (!request.isCompleted()) {
301 request.cancel();
475743b7
FC
302 }
303 }
948b0607
FC
304 };
305
306 if (Tracer.isRequestTraced())
307 Tracer.traceRequest(request, "queued"); //$NON-NLS-1$
308 fExecutor.execute(thread);
309
310 }
311
312 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
313
314 Thread thread = new Thread() {
315 @Override
316 public void run() {
317 request.start();
318
319 final Integer[] CHUNK_SIZE = new Integer[1];
320 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
321
322 final Integer[] nbRead = new Integer[1];
323 nbRead[0] = 0;
324
325 final Boolean[] isFinished = new Boolean[1];
326 isFinished[0] = Boolean.FALSE;
327
328 while (!isFinished[0]) {
329
12c155f5
FC
330 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
331 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
948b0607
FC
332 @Override
333 public void handleData(T data) {
334 super.handleData(data);
1b70b6dc
PT
335 if (request.getDataType().isInstance(data)) {
336 request.handleData(data);
337 }
948b0607
FC
338 if (getNbRead() > CHUNK_SIZE[0]) {
339 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
340 }
341 }
342
343 @Override
344 public void handleCompleted() {
345 nbRead[0] += getNbRead();
346 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 347 if (this.isCancelled()) {
948b0607 348 request.cancel();
90de83da 349 } else if (this.isFailed()) {
12c155f5 350 request.fail();
948b0607
FC
351 } else {
352 request.done();
353 }
354 isFinished[0] = Boolean.TRUE;
355 }
356 super.handleCompleted();
357 }
358 };
359
360 if (!isFinished[0]) {
361 queueRequest(subRequest);
362
363 try {
364 subRequest.waitForCompletion();
365 } catch (InterruptedException e) {
366 e.printStackTrace();
367 }
368
369 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
370 }
371 }
372 }
373 };
374
375 thread.start();
376 }
377
378 /**
12c155f5 379 * Initialize the provider based on the request. The context is provider specific and will be updated by getNext().
948b0607
FC
380 *
381 * @param request
12c155f5 382 * @return an application specific context; null if request can't be serviced
948b0607
FC
383 */
384 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
385
386 public abstract T getNext(ITmfContext context);
387
388 /**
389 * Checks if the data meets the request completion criteria.
390 *
0d9a6d76
FC
391 * @param request the request
392 * @param data the data to verify
393 * @param nbRead the number of events read so far
394 * @return true if completion criteria is met
948b0607
FC
395 */
396 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
b6be1c3e 397 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
398 }
399
400 // ------------------------------------------------------------------------
401 // Signal handlers
402 // ------------------------------------------------------------------------
403
404 @TmfSignalHandler
405 public void startSynch(TmfStartSynchSignal signal) {
406 synchronized (fLock) {
407 fSignalDepth++;
408 }
409 }
410
411 @TmfSignalHandler
412 public void endSynch(TmfEndSynchSignal signal) {
045df77d 413 synchronized (fLock) {
948b0607
FC
414 fSignalDepth--;
415 if (fSignalDepth == 0) {
416 fireRequest();
417 }
045df77d 418 }
948b0607 419 }
8c8bf09f
ASL
420
421}
This page took 0.056331 seconds and 5 git commands to generate.