Merge branch 'master' into lttng-kepler
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / request / TmfDataRequest.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.request;
14
15 import java.util.concurrent.CountDownLatch;
16
17 import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
18 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
19
20 /**
21 * TmfDataRequests are used to obtain blocks of contiguous data from a data provider. Open ranges can be used,
22 * especially for continuous streaming.
23 * <p>
24 * The request is processed asynchronously by a TmfProvider and, as blocks of data become available, handleData() is
25 * invoked synchronously for each block. Upon return, the data instances go out of scope and become eligible for gc. It
26 * is is thus the responsibility of the requester to either clone or keep a reference to the data it wishes to track
27 * specifically.
28 * <p>
29 * This data block approach is used to avoid busting the heap for very large trace files. The block size is
30 * configurable.
31 * <p>
32 * The TmfProvider indicates that the request is completed by calling done(). The request can be canceled at any time
33 * with cancel().
34 * <p>
35 * Typical usage:
36 *
37 * <pre>
38 * <code><i>TmfTimeWindow range = new TmfTimewindow(...);
39 * TmfDataRequest&lt;DataType[]&gt; request = new TmfDataRequest&lt;DataType[]&gt;(DataType.class, 0, NB_EVENTS, BLOCK_SIZE) {
40 * public void handleData() {
41 * DataType[] data = request.getData();
42 * for (DataType e : data) {
43 * // do something
44 * }
45 * }
46 * public void handleSuccess() {
47 * // do something
48 * }
49 * }
50 * public void handleFailure() {
51 * // do something
52 * }
53 * }
54 * public void handleCancel() {
55 * // do something
56 * }
57 * }
58 * };
59 * fProcessor.process(request, true);
60 * </i></code>
61 * </pre>
62 *
63 * TODO: Consider decoupling from "time range", "rank", etc and for the more generic notion of "criteria". This would
64 * allow to extend for "time range", etc instead of providing specialized constructors. This also means removing the
65 * criteria info from the data structure (with the possible exception of fNbRequested). The nice thing about it is that
66 * it would prepare us well for the coming generation of analysis tools.
67 *
68 * TODO: Implement request failures (codes, etc...)
69 *
70 * @version 1.0
71 * @author Francois Chouinard
72 */
73 public abstract class TmfDataRequest implements ITmfDataRequest {
74
75 // ------------------------------------------------------------------------
76 // Constants
77 // ------------------------------------------------------------------------
78
79 /** The default maximum number of events per chunk */
80 public static final int DEFAULT_BLOCK_SIZE = 1000;
81
82 /** The request count for all the events */
83 public static final int ALL_DATA = Integer.MAX_VALUE;
84
85 private static int fRequestNumber = 0;
86
87 // ------------------------------------------------------------------------
88 // Attributes
89 // ------------------------------------------------------------------------
90
91 private final Class<? extends ITmfEvent> fDataType;
92 private final ExecutionType fExecType;
93 private final int fRequestId; // A unique request ID
94 protected long fIndex; // The index (rank) of the requested event
95 protected int fNbRequested; // The number of requested events (ALL_DATA for all)
96 private final int fBlockSize; // The block size (for BG requests)
97 private int fNbRead; // The number of reads so far
98
99 private final CountDownLatch startedLatch = new CountDownLatch(1);
100 private final CountDownLatch completedLatch = new CountDownLatch(1);
101 private boolean fRequestRunning;
102 private boolean fRequestCompleted;
103 private boolean fRequestFailed;
104 private boolean fRequestCanceled;
105
106 // ------------------------------------------------------------------------
107 // Constructors
108 // ------------------------------------------------------------------------
109
110 /**
111 * Resets the request counter (used for testing)
112 */
113 public static void reset() {
114 fRequestNumber = 0;
115 }
116
117 /**
118 * Request all the events of a given type (high priority)
119 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
120 *
121 * @param dataType the requested data type
122 */
123 public TmfDataRequest(Class<? extends ITmfEvent> dataType) {
124 this(dataType, 0, ALL_DATA, DEFAULT_BLOCK_SIZE, ExecutionType.FOREGROUND);
125 }
126
127 /**
128 * Request all the events of a given type (given priority)
129 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
130 *
131 * @param dataType the requested data type
132 * @param priority the requested execution priority
133 */
134 public TmfDataRequest(Class<? extends ITmfEvent> dataType, ExecutionType priority) {
135 this(dataType, 0, ALL_DATA, DEFAULT_BLOCK_SIZE, priority);
136 }
137
138 /**
139 * Request all the events of a given type from the given index (high priority)
140 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
141 *
142 * @param dataType the requested data type
143 * @param index the index of the first event to retrieve
144 */
145 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index) {
146 this(dataType, index, ALL_DATA, DEFAULT_BLOCK_SIZE, ExecutionType.FOREGROUND);
147 }
148
149 /**
150 * Request all the events of a given type from the given index (given priority)
151 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
152 *
153 * @param dataType the requested data type
154 * @param index the index of the first event to retrieve
155 * @param priority the requested execution priority
156 */
157 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index, ExecutionType priority) {
158 this(dataType, index, ALL_DATA, DEFAULT_BLOCK_SIZE, priority);
159 }
160
161 /**
162 * Request 'n' events of a given type from the given index (high priority)
163 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
164 *
165 * @param dataType the requested data type
166 * @param index the index of the first event to retrieve
167 * @param nbRequested the number of events requested
168 */
169 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index, int nbRequested) {
170 this(dataType, index, nbRequested, DEFAULT_BLOCK_SIZE, ExecutionType.FOREGROUND);
171 }
172
173 /**
174 * Request 'n' events of a given type from the given index (given priority)
175 * Events are returned in blocks of the default size (DEFAULT_BLOCK_SIZE).
176 *
177 * @param dataType the requested data type
178 * @param index the index of the first event to retrieve
179 * @param nbRequested the number of events requested
180 * @param priority the requested execution priority
181 */
182 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index, int nbRequested, ExecutionType priority) {
183 this(dataType, index, nbRequested, DEFAULT_BLOCK_SIZE, priority);
184 }
185
186 /**
187 * Request 'n' events of a given type from the given index (high priority).
188 * Events are returned in blocks of the given size.
189 *
190 * @param dataType the requested data type
191 * @param index the index of the first event to retrieve
192 * @param nbRequested the number of events requested
193 * @param blockSize the number of events per block
194 */
195 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index, int nbRequested, int blockSize) {
196 this(dataType, index, nbRequested, blockSize, ExecutionType.FOREGROUND);
197 }
198
199 /**
200 * Request 'n' events of a given type from the given index (given priority).
201 * Events are returned in blocks of the given size.
202 *
203 * @param dataType the requested data type
204 * @param index the index of the first event to retrieve
205 * @param nbRequested the number of events requested
206 * @param blockSize the number of events per block
207 * @param priority the requested execution priority
208 */
209 public TmfDataRequest(Class<? extends ITmfEvent> dataType, long index, int nbRequested, int blockSize, ExecutionType priority) {
210 fRequestId = fRequestNumber++;
211 fDataType = dataType;
212 fIndex = index;
213 fNbRequested = nbRequested;
214 fBlockSize = blockSize;
215 fExecType = priority;
216 fNbRead = 0;
217
218 fRequestRunning = false;
219 fRequestCompleted = false;
220 fRequestFailed = false;
221 fRequestCanceled = false;
222
223 if (!(this instanceof ITmfEventRequest) && TmfCoreTracer.isRequestTraced()) {
224 String type = getClass().getName();
225 type = type.substring(type.lastIndexOf('.') + 1);
226 @SuppressWarnings("nls")
227 String message = "CREATED "
228 + (getExecType() == ITmfDataRequest.ExecutionType.BACKGROUND ? "(BG)" : "(FG)")
229 + " Type=" + type + " Index=" + getIndex() + " NbReq=" + getNbRequested()
230 + " DataType=" + getDataType().getSimpleName();
231 TmfCoreTracer.traceRequest(this, message);
232 }
233 }
234
235 /**
236 * Copy constructor
237 */
238 @SuppressWarnings("unused")
239 private TmfDataRequest(TmfDataRequest other) {
240 this(null, 0, ALL_DATA, DEFAULT_BLOCK_SIZE);
241 }
242
243 // ------------------------------------------------------------------------
244 // Accessors
245 // ------------------------------------------------------------------------
246
247 /**
248 * @return the request ID
249 */
250 @Override
251 public int getRequestId() {
252 return fRequestId;
253 }
254
255 /**
256 * @return the index of the first event requested
257 */
258 @Override
259 public long getIndex() {
260 return fIndex;
261 }
262
263 /**
264 * @return the execution type (priority)
265 */
266 @Override
267 public ExecutionType getExecType() {
268 return fExecType;
269 }
270
271 /**
272 * @return the number of requested events (ALL_DATA = all)
273 */
274 @Override
275 public int getNbRequested() {
276 return fNbRequested;
277 }
278
279 /**
280 * @return the block size (for BG requests)
281 */
282 @Override
283 public int getBlockSize() {
284 return fBlockSize;
285 }
286
287 /**
288 * @return the number of events read so far
289 */
290 @Override
291 public synchronized int getNbRead() {
292 return fNbRead;
293 }
294
295 /**
296 * @return indicates if the request is currently running
297 */
298 @Override
299 public synchronized boolean isRunning() {
300 return fRequestRunning;
301 }
302
303 /**
304 * @return indicates if the request is completed
305 */
306 @Override
307 public synchronized boolean isCompleted() {
308 return fRequestCompleted;
309 }
310
311 /**
312 * @return indicates if the request has failed
313 */
314 @Override
315 public synchronized boolean isFailed() {
316 return fRequestFailed;
317 }
318
319 /**
320 * @return indicates if the request is canceled
321 */
322 @Override
323 public synchronized boolean isCancelled() {
324 return fRequestCanceled;
325 }
326
327 /**
328 * @return the requested data type
329 */
330 @Override
331 public Class<? extends ITmfEvent> getDataType() {
332 return fDataType;
333 }
334
335 // ------------------------------------------------------------------------
336 // Setters
337 // ------------------------------------------------------------------------
338
339 /**
340 * this method is called by the event provider to set the index corresponding to the time range start time
341 *
342 * @param index
343 * the start time index
344 */
345 protected void setIndex(int index) {
346 fIndex = index;
347 }
348
349 // ------------------------------------------------------------------------
350 // Operators
351 // ------------------------------------------------------------------------
352
353 /**
354 * Handle incoming data, one event at a time i.e. this method is invoked
355 * for every data item obtained by the request.
356 *
357 * - Data items are received in the order they appear in the stream
358 * - Called by the request processor, in its execution thread, every time
359 * a block of data becomes available.
360 * - Request processor performs a synchronous call to handleData() i.e.
361 * its execution threads holds until handleData() returns.
362 * - Original data items are disposed of on return i.e. keep a reference
363 * (or a copy) if some persistence is needed between invocations.
364 * - When there is no more data, done() is called.
365 *
366 * @param data a piece of data
367 */
368 @Override
369 public void handleData(ITmfEvent data) {
370 if (data != null) {
371 fNbRead++;
372 }
373 }
374
375 @Override
376 public void handleStarted() {
377 if (TmfCoreTracer.isRequestTraced()) {
378 TmfCoreTracer.traceRequest(this, "STARTED"); //$NON-NLS-1$
379 }
380 }
381
382 /**
383 * Handle the completion of the request. It is called when there is no
384 * more data available either because:
385 * - the request completed normally
386 * - the request failed
387 * - the request was canceled
388 *
389 * As a convenience, handleXXXX methods are provided. They are meant to be
390 * overridden by the application if it needs to handle these conditions.
391 */
392 @Override
393 public void handleCompleted() {
394 boolean requestFailed = false;
395 boolean requestCanceled = false;
396 synchronized (this) {
397 requestFailed = fRequestFailed;
398 requestCanceled = fRequestCanceled;
399 }
400
401 if (requestFailed) {
402 handleFailure();
403 } else if (requestCanceled) {
404 handleCancel();
405 } else {
406 handleSuccess();
407 }
408 if (TmfCoreTracer.isRequestTraced()) {
409 TmfCoreTracer.traceRequest(this, "COMPLETED (" + fNbRead + " events read)"); //$NON-NLS-1$ //$NON-NLS-2$
410 }
411 }
412
413 @Override
414 public void handleSuccess() {
415 if (TmfCoreTracer.isRequestTraced()) {
416 TmfCoreTracer.traceRequest(this, "SUCCEEDED"); //$NON-NLS-1$
417 }
418 }
419
420 @Override
421 public void handleFailure() {
422 if (TmfCoreTracer.isRequestTraced()) {
423 TmfCoreTracer.traceRequest(this, "FAILED"); //$NON-NLS-1$
424 }
425 }
426
427 @Override
428 public void handleCancel() {
429 if (TmfCoreTracer.isRequestTraced()) {
430 TmfCoreTracer.traceRequest(this, "CANCELLED"); //$NON-NLS-1$
431 }
432 }
433
434 /**
435 * To suspend the client thread until the request starts (or is canceled).
436 *
437 * @throws InterruptedException
438 * If the thread was interrupted while waiting
439 */
440 public void waitForStart() throws InterruptedException {
441 while (!fRequestRunning) {
442 startedLatch.await();
443 }
444 }
445
446 /**
447 * To suspend the client thread until the request completes (or is
448 * canceled).
449 *
450 * @throws InterruptedException
451 * If the thread was interrupted while waiting
452 */
453 @Override
454 public void waitForCompletion() throws InterruptedException {
455 while (!fRequestCompleted) {
456 completedLatch.await();
457 }
458 }
459
460 /**
461 * Called by the request processor upon starting to service the request.
462 */
463 @Override
464 public void start() {
465 synchronized (this) {
466 fRequestRunning = true;
467 }
468 handleStarted();
469 startedLatch.countDown();
470 }
471
472 /**
473 * Called by the request processor upon completion.
474 */
475 @Override
476 public void done() {
477 synchronized (this) {
478 if (!fRequestCompleted) {
479 fRequestRunning = false;
480 fRequestCompleted = true;
481 } else {
482 return;
483 }
484 }
485 try {
486 handleCompleted();
487 } finally {
488 completedLatch.countDown();
489 }
490 }
491
492 /**
493 * Called by the request processor upon failure.
494 */
495 @Override
496 public void fail() {
497 synchronized (this) {
498 fRequestFailed = true;
499 }
500 done();
501 }
502
503 /**
504 * Called by the request processor upon cancellation.
505 */
506 @Override
507 public void cancel() {
508 synchronized (this) {
509 fRequestCanceled = true;
510 }
511 done();
512 }
513
514 // ------------------------------------------------------------------------
515 // Object
516 // ------------------------------------------------------------------------
517
518 @Override
519 // All requests have a unique id
520 public int hashCode() {
521 return getRequestId();
522 }
523
524 @Override
525 public boolean equals(Object other) {
526 if (other instanceof TmfDataRequest) {
527 TmfDataRequest request = (TmfDataRequest) other;
528 return (request.fDataType == fDataType) && (request.fIndex == fIndex)
529 && (request.fNbRequested == fNbRequested);
530 }
531 return false;
532 }
533
534 @Override
535 @SuppressWarnings("nls")
536 public String toString() {
537 return "[TmfDataRequest(" + fRequestId + "," + fDataType.getSimpleName() + "," + fIndex + "," + fNbRequested
538 + "," + getBlockSize() + ")]";
539 }
540 }
This page took 0.041233 seconds and 5 git commands to generate.