Contribute CNF based TMF project handling
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.tmf.Tracer;
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/ deregister the event provider and to handle
38 * generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or just implement the hooks (initializeContext()
41 * and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 // ------------------------------------------------------------------------
52 // Attributes
53 // ------------------------------------------------------------------------
54
55 protected Class<T> fType;
56 protected boolean fLogData;
57 protected boolean fLogError;
58
59 public static final int DEFAULT_BLOCK_SIZE = 50000;
60 public static final int DEFAULT_QUEUE_SIZE = 1000;
61
62 protected int fQueueSize;
63 protected BlockingQueue<T> fDataQueue;
64 protected TmfRequestExecutor fExecutor;
65
66 private int fSignalDepth = 0;
67 private final Object fLock = new Object();
68
69 private int fRequestPendingCounter = 0;
70
71 // ------------------------------------------------------------------------
72 // Constructors
73 // ------------------------------------------------------------------------
74
75 public TmfDataProvider() {
76 fQueueSize = DEFAULT_QUEUE_SIZE;
77 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
78 fExecutor = new TmfRequestExecutor();
79 }
80
81 public void init(String name, Class<T> dataType) {
82 super.init(name);
83 fType = dataType;
84 fQueueSize = DEFAULT_QUEUE_SIZE;
85 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
86
87 fExecutor = new TmfRequestExecutor();
88 fSignalDepth = 0;
89
90 fLogData = Tracer.isEventTraced();
91 fLogError = Tracer.isErrorTraced();
92
93 TmfProviderManager.register(fType, this);
94 }
95
96 public TmfDataProvider(String name, Class<T> type) {
97 this(name, type, DEFAULT_QUEUE_SIZE);
98 }
99
100 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
101 super(name);
102 fType = type;
103 fQueueSize = queueSize;
104 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
105
106 fExecutor = new TmfRequestExecutor();
107 fSignalDepth = 0;
108
109 fLogData = Tracer.isEventTraced();
110 fLogError = Tracer.isErrorTraced();
111
112 TmfProviderManager.register(fType, this);
113 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
114 }
115
116 public TmfDataProvider(TmfDataProvider<T> other) {
117 super(other);
118 fType = other.fType;
119 fQueueSize = other.fQueueSize;
120 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
121
122 fExecutor = new TmfRequestExecutor();
123 fSignalDepth = 0;
124
125 fLogData = Tracer.isEventTraced();
126 fLogError = Tracer.isErrorTraced();
127 }
128
129 @Override
130 public void dispose() {
131 TmfProviderManager.deregister(fType, this);
132 fExecutor.stop();
133 super.dispose();
134 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
135 }
136
137 public int getQueueSize() {
138 return fQueueSize;
139 }
140
141 public Class<?> getType() {
142 return fType;
143 }
144
145 // ------------------------------------------------------------------------
146 // ITmfRequestHandler
147 // ------------------------------------------------------------------------
148
149 @Override
150 public void sendRequest(final ITmfDataRequest<T> request) {
151 synchronized (fLock) {
152 if (fSignalDepth > 0) {
153 coalesceDataRequest(request);
154 } else {
155 dispatchRequest(request);
156 }
157 }
158 }
159
160 /**
161 * This method queues the coalesced requests.
162 *
163 * @param thread
164 */
165 @Override
166 public void fireRequest() {
167 synchronized (fLock) {
168 if (fRequestPendingCounter > 0) {
169 return;
170 }
171 if (fPendingCoalescedRequests.size() > 0) {
172 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
173 dispatchRequest(request);
174 }
175 fPendingCoalescedRequests.clear();
176 }
177 }
178 }
179
180 /**
181 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
182 * coalescing requests accross multiple TmfDataProvider.
183 *
184 * @param isIncrement
185 */
186 @Override
187 public void notifyPendingRequest(boolean isIncrement) {
188 synchronized (fLock) {
189 if (isIncrement) {
190 if (fSignalDepth > 0) {
191 fRequestPendingCounter++;
192 }
193 } else {
194 if (fRequestPendingCounter > 0) {
195 fRequestPendingCounter--;
196 }
197
198 // fire request if all pending requests are received
199 if (fRequestPendingCounter == 0) {
200 fireRequest();
201 }
202 }
203 }
204 }
205
206 // ------------------------------------------------------------------------
207 // Coalescing (primitive test...)
208 // ------------------------------------------------------------------------
209
210 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
211
212 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
213 synchronized (fLock) {
214 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(fType, request.getIndex(),
215 request.getNbRequested(), request.getBlockSize(), request.getExecType());
216 coalescedRequest.addRequest(request);
217 if (Tracer.isRequestTraced()) {
218 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
219 }
220 fPendingCoalescedRequests.add(coalescedRequest);
221 }
222 }
223
224 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
225 synchronized (fLock) {
226 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
227 if (coalescedRequest.isCompatible(request)) {
228 coalescedRequest.addRequest(request);
229 if (Tracer.isRequestTraced()) {
230 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
231 }
232 return;
233 }
234 }
235 newCoalescedDataRequest(request);
236 }
237 }
238
239 // ------------------------------------------------------------------------
240 // Request processing
241 // ------------------------------------------------------------------------
242
243 private void dispatchRequest(final ITmfDataRequest<T> request) {
244 if (request.getExecType() == ExecutionType.FOREGROUND)
245 queueRequest(request);
246 else
247 queueBackgroundRequest(request, request.getBlockSize(), true);
248 }
249
250 protected void queueRequest(final ITmfDataRequest<T> request) {
251
252 if (fExecutor.isShutdown()) {
253 request.cancel();
254 return;
255 }
256
257 final TmfDataProvider<T> provider = this;
258
259 // Process the request
260 TmfThread thread = new TmfThread(request.getExecType()) {
261
262 @Override
263 public void run() {
264
265 if (Tracer.isRequestTraced())
266 Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName()); //$NON-NLS-1$//$NON-NLS-2$
267
268 // Extract the generic information
269 request.start();
270 int nbRequested = request.getNbRequested();
271 int nbRead = 0;
272
273 // Initialize the execution
274 ITmfContext context = armRequest(request);
275 if (context == null) {
276 request.cancel();
277 return;
278 }
279
280 try {
281 // Get the ordered events
282 T data = getNext(context);
283 if (Tracer.isRequestTraced())
284 Tracer.trace("Request #" + request.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
285 while (data != null && !isCompleted(request, data, nbRead)) {
286 if (fLogData)
287 Tracer.traceEvent(provider, request, data);
288 request.handleData(data);
289
290 // To avoid an unnecessary read passed the last data
291 // requested
292 if (++nbRead < nbRequested) {
293 data = getNext(context);
294 }
295 }
296 if (Tracer.isRequestTraced())
297 Tracer.trace("Request #" + request.getRequestId() + " finished"); //$NON-NLS-1$//$NON-NLS-2$
298
299 if (request.isCancelled()) {
300 request.cancel();
301 } else {
302 request.done();
303 }
304 } catch (Exception e) {
305 request.fail();
306 }
307
308 // Cleanup
309 context.dispose();
310 }
311 };
312
313 if (Tracer.isRequestTraced())
314 Tracer.traceRequest(request, "queued"); //$NON-NLS-1$
315 fExecutor.execute(thread);
316
317 }
318
319 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
320
321 Thread thread = new Thread() {
322 @Override
323 public void run() {
324 request.start();
325
326 final Integer[] CHUNK_SIZE = new Integer[1];
327 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
328
329 final Integer[] nbRead = new Integer[1];
330 nbRead[0] = 0;
331
332 final Boolean[] isFinished = new Boolean[1];
333 isFinished[0] = Boolean.FALSE;
334
335 while (!isFinished[0]) {
336
337 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
338 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
339 @Override
340 public void handleData(T data) {
341 super.handleData(data);
342 request.handleData(data);
343 if (getNbRead() > CHUNK_SIZE[0]) {
344 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
345 }
346 }
347
348 @Override
349 public void handleCompleted() {
350 nbRead[0] += getNbRead();
351 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
352 if (this.isCancelled()) {
353 request.cancel();
354 } else if (this.isFailed()) {
355 request.fail();
356 } else {
357 request.done();
358 }
359 isFinished[0] = Boolean.TRUE;
360 }
361 super.handleCompleted();
362 }
363 };
364
365 if (!isFinished[0]) {
366 queueRequest(subRequest);
367
368 try {
369 subRequest.waitForCompletion();
370 } catch (InterruptedException e) {
371 e.printStackTrace();
372 }
373
374 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
375 }
376 }
377 }
378 };
379
380 thread.start();
381 }
382
383 /**
384 * Initialize the provider based on the request. The context is provider specific and will be updated by getNext().
385 *
386 * @param request
387 * @return an application specific context; null if request can't be serviced
388 */
389 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
390
391 public abstract T getNext(ITmfContext context);
392
393 /**
394 * Checks if the data meets the request completion criteria.
395 *
396 * @param request
397 * @param data
398 * @return
399 */
400 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
401 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
402 }
403
404 // ------------------------------------------------------------------------
405 // Signal handlers
406 // ------------------------------------------------------------------------
407
408 @TmfSignalHandler
409 public void startSynch(TmfStartSynchSignal signal) {
410 synchronized (fLock) {
411 fSignalDepth++;
412 }
413 }
414
415 @TmfSignalHandler
416 public void endSynch(TmfEndSynchSignal signal) {
417 synchronized (fLock) {
418 fSignalDepth--;
419 if (fSignalDepth == 0) {
420 fireRequest();
421 }
422 }
423 }
424
425 }
This page took 0.040176 seconds and 5 git commands to generate.