1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.experiment
;
15 import java
.util
.Collections
;
16 import java
.util
.Vector
;
18 import org
.eclipse
.linuxtools
.tmf
.component
.TmfEventProvider
;
19 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEvent
;
20 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
21 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
22 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
.ExecutionType
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfEventRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfEventRequest
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfExperimentSelectedSignal
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfExperimentUpdatedSignal
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
30 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalManager
;
31 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfTraceUpdatedSignal
;
32 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
33 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfLocation
;
34 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfTrace
;
35 import org
.eclipse
.linuxtools
.tmf
.trace
.TmfCheckpoint
;
36 import org
.eclipse
.linuxtools
.tmf
.trace
.TmfContext
;
39 * <b><u>TmfExperiment</u></b>
41 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces
42 * that are part of a tracing experiment.
45 public class TmfExperiment
<T
extends TmfEvent
> extends TmfEventProvider
<T
> implements ITmfTrace
{
47 // ------------------------------------------------------------------------
49 // ------------------------------------------------------------------------
51 // The currently selected experiment
52 protected static TmfExperiment
<?
> fCurrentExperiment
= null;
54 // The set of traces that constitute the experiment
55 protected ITmfTrace
[] fTraces
;
57 // The total number of events
58 protected long fNbEvents
;
60 // The experiment time range
61 protected TmfTimeRange fTimeRange
;
63 // The experiment reference timestamp (default: Zero)
64 protected TmfTimestamp fEpoch
;
66 // The experiment index
67 protected Vector
<TmfCheckpoint
> fCheckpoints
= new Vector
<TmfCheckpoint
>();
69 // The current experiment context
70 protected TmfExperimentContext fExperimentContext
;
72 // ------------------------------------------------------------------------
74 // ------------------------------------------------------------------------
81 * @param indexPageSize
83 public TmfExperiment(Class
<T
> type
, String id
, ITmfTrace
[] traces
, TmfTimestamp epoch
, int indexPageSize
) {
84 this(type
, id
, traces
, TmfTimestamp
.Zero
, indexPageSize
, false);
87 public TmfExperiment(Class
<T
> type
, String id
, ITmfTrace
[] traces
, TmfTimestamp epoch
, int indexPageSize
, boolean preIndexExperiment
) {
92 fIndexPageSize
= indexPageSize
;
94 if (preIndexExperiment
) indexExperiment(true);
99 protected TmfExperiment(String id
, Class
<T
> type
) {
108 public TmfExperiment(Class
<T
> type
, String id
, ITmfTrace
[] traces
) {
109 this(type
, id
, traces
, TmfTimestamp
.Zero
, DEFAULT_INDEX_PAGE_SIZE
);
116 * @param indexPageSize
118 public TmfExperiment(Class
<T
> type
, String id
, ITmfTrace
[] traces
, int indexPageSize
) {
119 this(type
, id
, traces
, TmfTimestamp
.Zero
, indexPageSize
);
126 public TmfExperiment(TmfExperiment
<T
> other
) {
127 super(other
.getName() + "(clone)", other
.fType
); //$NON-NLS-1$
129 fEpoch
= other
.fEpoch
;
130 fIndexPageSize
= other
.fIndexPageSize
;
132 fTraces
= new ITmfTrace
[other
.fTraces
.length
];
133 for (int trace
= 0; trace
< other
.fTraces
.length
; trace
++) {
134 fTraces
[trace
] = other
.fTraces
[trace
].createTraceCopy();
137 fNbEvents
= other
.fNbEvents
;
138 fTimeRange
= other
.fTimeRange
;
142 public TmfExperiment
<T
> createTraceCopy() {
143 TmfExperiment
<T
> experiment
= new TmfExperiment
<T
>(this);
144 TmfSignalManager
.deregister(experiment
);
149 * Clears the experiment
152 public synchronized void dispose() {
153 if (fTraces
!= null) {
154 for (ITmfTrace trace
: fTraces
) {
159 if (fCheckpoints
!= null) {
160 fCheckpoints
.clear();
165 // ------------------------------------------------------------------------
167 // ------------------------------------------------------------------------
170 public String
getPath() {
175 public long getNbEvents() {
180 public int getCacheSize() {
181 return fIndexPageSize
;
185 public TmfTimeRange
getTimeRange() {
190 public TmfTimestamp
getStartTime() {
191 return fTimeRange
.getStartTime();
195 public TmfTimestamp
getEndTime() {
196 return fTimeRange
.getEndTime();
199 public Vector
<TmfCheckpoint
> getCheckpoints() {
203 // ------------------------------------------------------------------------
205 // ------------------------------------------------------------------------
207 public static void setCurrentExperiment(TmfExperiment
<?
> experiment
) {
208 fCurrentExperiment
= experiment
;
211 public static TmfExperiment
<?
> getCurrentExperiment() {
212 return fCurrentExperiment
;
215 public TmfTimestamp
getEpoch() {
219 public ITmfTrace
[] getTraces() {
224 * Returns the rank of the first event with the requested timestamp.
225 * If none, returns the index of the next event (if any).
231 public long getRank(TmfTimestamp timestamp
) {
232 TmfExperimentContext context
= seekEvent(timestamp
);
233 return context
.getRank();
237 * Returns the timestamp of the event at the requested index.
238 * If none, returns null.
243 public TmfTimestamp
getTimestamp(int index
) {
244 TmfExperimentContext context
= seekEvent(index
);
245 TmfEvent event
= getNextEvent(context
);
246 return (event
!= null) ? event
.getTimestamp() : null;
249 // ------------------------------------------------------------------------
251 // ------------------------------------------------------------------------
254 * Update the total number of events
256 private void updateNbEvents() {
258 for (ITmfTrace trace
: fTraces
) {
259 nbEvents
+= trace
.getNbEvents();
261 fNbEvents
= nbEvents
;
265 * Update the global time range
267 private void updateTimeRange() {
268 TmfTimestamp startTime
= fTimeRange
!= null ? fTimeRange
.getStartTime() : TmfTimestamp
.BigCrunch
;
269 TmfTimestamp endTime
= fTimeRange
!= null ? fTimeRange
.getEndTime() : TmfTimestamp
.BigBang
;
271 for (ITmfTrace trace
: fTraces
) {
272 TmfTimestamp traceStartTime
= trace
.getStartTime();
273 if (traceStartTime
.compareTo(startTime
, true) < 0)
274 startTime
= traceStartTime
;
275 TmfTimestamp traceEndTime
= trace
.getEndTime();
276 if (traceEndTime
.compareTo(endTime
, true) > 0)
277 endTime
= traceEndTime
;
279 fTimeRange
= new TmfTimeRange(startTime
, endTime
);
282 // ------------------------------------------------------------------------
284 // ------------------------------------------------------------------------
287 public ITmfContext
armRequest(ITmfDataRequest
<T
> request
) {
288 // Tracer.trace("Ctx: Arming request - start");
289 TmfTimestamp timestamp
= (request
instanceof ITmfEventRequest
<?
>) ?
290 ((ITmfEventRequest
<T
>) request
).getRange().getStartTime() : null;
291 TmfExperimentContext context
= (timestamp
!= null) ?
292 seekEvent(timestamp
) : seekEvent(request
.getIndex());
293 // Tracer.trace("Ctx: Arming request - done");
297 @SuppressWarnings("unchecked")
299 public T
getNext(ITmfContext context
) {
300 if (context
instanceof TmfExperimentContext
) {
301 return (T
) getNextEvent((TmfExperimentContext
) context
);
306 // ------------------------------------------------------------------------
307 // ITmfTrace trace positioning
308 // ------------------------------------------------------------------------
310 // Returns a brand new context based on the location provided
311 // and initializes the event queues
313 public synchronized TmfExperimentContext
seekLocation(ITmfLocation
<?
> location
) {
315 // Validate the location
316 if (location
!= null && !(location
instanceof TmfExperimentLocation
)) {
317 return null; // Throw an exception?
320 // Instantiate the location
321 TmfExperimentLocation expLocation
= (location
== null)
322 ?
new TmfExperimentLocation(new ITmfLocation
<?
>[fTraces
.length
], new long[fTraces
.length
])
323 : (TmfExperimentLocation
) location
.clone();
325 // Create and populate the context's traces contexts
326 TmfExperimentContext context
= new TmfExperimentContext(fTraces
, new TmfContext
[fTraces
.length
]);
327 // Tracer.trace("Ctx: SeekLocation - start");
330 for (int i
= 0; i
< fTraces
.length
; i
++) {
331 // Get the relevant trace attributes
332 ITmfLocation
<?
> traceLocation
= expLocation
.getLocation()[i
];
333 long traceRank
= expLocation
.getRanks()[i
];
335 // Set the corresponding sub-context
336 context
.getContexts()[i
] = fTraces
[i
].seekLocation(traceLocation
);
337 context
.getContexts()[i
].setRank(traceRank
);
340 // Set the trace location and read the corresponding event
341 expLocation
.getLocation()[i
] = context
.getContexts()[i
].getLocation();
342 context
.getEvents()[i
] = fTraces
[i
].getNextEvent(context
.getContexts()[i
]);
345 // Tracer.trace("Ctx: SeekLocation - done");
348 context
.setLocation(expLocation
);
349 context
.setLastTrace(TmfExperimentContext
.NO_TRACE
);
350 context
.setRank(rank
);
352 fExperimentContext
= context
;
358 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
361 public synchronized TmfExperimentContext
seekEvent(TmfTimestamp timestamp
) {
363 // Tracer.trace("Ctx: seekEvent(TS) - start");
365 if (timestamp
== null) {
366 timestamp
= TmfTimestamp
.BigBang
;
369 // First, find the right checkpoint
370 int index
= Collections
.binarySearch(fCheckpoints
, new TmfCheckpoint(timestamp
, null));
372 // In the very likely case that the checkpoint was not found, bsearch
373 // returns its negated would-be location (not an offset...). From that
374 // index, we can then position the stream and get the event.
376 index
= Math
.max(0, -(index
+ 2));
379 // Position the experiment at the checkpoint
380 ITmfLocation
<?
> location
;
381 synchronized (fCheckpoints
) {
382 if (fCheckpoints
.size() > 0) {
383 if (index
>= fCheckpoints
.size()) {
384 index
= fCheckpoints
.size() - 1;
386 location
= fCheckpoints
.elementAt(index
).getLocation();
393 TmfExperimentContext context
= seekLocation(location
);
394 context
.setRank((long) index
* fIndexPageSize
);
396 // And locate the event
397 TmfEvent event
= parseEvent(context
);
398 while (event
!= null && event
.getTimestamp().compareTo(timestamp
, false) < 0) {
399 getNextEvent(context
);
400 event
= parseEvent(context
);
404 context
.setLocation(null);
405 context
.setRank(ITmfContext
.UNKNOWN_RANK
);
412 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
415 public synchronized TmfExperimentContext
seekEvent(long rank
) {
417 // Tracer.trace("Ctx: seekEvent(rank) - start");
419 // Position the stream at the previous checkpoint
420 int index
= (int) rank
/ fIndexPageSize
;
421 ITmfLocation
<?
> location
;
422 synchronized (fCheckpoints
) {
423 if (fCheckpoints
.size() == 0) {
427 if (index
>= fCheckpoints
.size()) {
428 index
= fCheckpoints
.size() - 1;
430 location
= fCheckpoints
.elementAt(index
).getLocation();
434 TmfExperimentContext context
= seekLocation(location
);
435 context
.setRank((long) index
* fIndexPageSize
);
437 // And locate the event
438 TmfEvent event
= parseEvent(context
);
439 long pos
= context
.getRank();
440 while (event
!= null && pos
++ < rank
) {
441 getNextEvent(context
);
442 event
= parseEvent(context
);
446 context
.setLocation(null);
447 context
.setRank(ITmfContext
.UNKNOWN_RANK
);
454 * Scan the next events from all traces and return the next one
455 * in chronological order.
461 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
463 // TmfContext context0 = context.getContexts()[0];
464 // TmfEvent event0 = context.getEvents()[0];
465 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
466 // long rank0 = context.getRank();
467 // int trace = context.getLastTrace();
469 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
471 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
472 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
473 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
474 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
475 // Tracer.trace(result.toString());
479 public synchronized TmfEvent
getNextEvent(TmfContext context
) {
481 // Validate the context
482 if (!(context
instanceof TmfExperimentContext
)) {
483 return null; // Throw an exception?
486 if (!context
.equals(fExperimentContext
)) {
487 // Tracer.trace("Ctx: Restoring context");
488 seekLocation(context
.getLocation());
491 TmfExperimentContext expContext
= (TmfExperimentContext
) context
;
493 // dumpContext(expContext, true);
495 // If an event was consumed previously, get the next one from that trace
496 int lastTrace
= expContext
.getLastTrace();
497 if (lastTrace
!= TmfExperimentContext
.NO_TRACE
) {
498 TmfContext traceContext
= expContext
.getContexts()[lastTrace
];
499 expContext
.getEvents()[lastTrace
] = expContext
.getTraces()[lastTrace
].getNextEvent(traceContext
);
500 expContext
.setLastTrace(TmfExperimentContext
.NO_TRACE
);
503 // Scan the candidate events and identify the "next" trace to read from
504 int trace
= TmfExperimentContext
.NO_TRACE
;
505 TmfTimestamp timestamp
= TmfTimestamp
.BigCrunch
;
506 for (int i
= 0; i
< expContext
.getTraces().length
; i
++) {
507 TmfEvent event
= expContext
.getEvents()[i
];
508 if (event
!= null && event
.getTimestamp() != null) {
509 TmfTimestamp otherTS
= event
.getTimestamp();
510 if (otherTS
.compareTo(timestamp
, true) < 0) {
517 // Update the experiment context and set the "next" event
518 TmfEvent event
= null;
519 if (trace
!= TmfExperimentContext
.NO_TRACE
) {
520 updateIndex(expContext
, timestamp
);
522 TmfContext traceContext
= expContext
.getContexts()[trace
];
523 TmfExperimentLocation expLocation
= (TmfExperimentLocation
) expContext
.getLocation();
524 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
525 expLocation
.getLocation()[trace
] = traceContext
.getLocation();
527 // updateIndex(expContext, timestamp);
529 expLocation
.getRanks()[trace
] = traceContext
.getRank();
530 expContext
.setLastTrace(trace
);
531 expContext
.updateRank(1);
532 event
= expContext
.getEvents()[trace
];
535 // if (event != null) {
536 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
537 // dumpContext(expContext, false);
538 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
544 public synchronized void updateIndex(ITmfContext context
, TmfTimestamp timestamp
) {
545 // Build the index as we go along
546 long rank
= context
.getRank();
547 if (context
.isValidRank() && (rank
% fIndexPageSize
) == 0) {
548 // Determine the table position
549 long position
= rank
/ fIndexPageSize
;
550 // Add new entry at proper location (if empty)
551 if (fCheckpoints
.size() == position
) {
552 ITmfLocation
<?
> location
= context
.getLocation().clone();
553 fCheckpoints
.add(new TmfCheckpoint(timestamp
.clone(), location
));
554 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", " + location.toString());
560 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools.tmf.trace.TmfContext)
563 public TmfEvent
parseEvent(TmfContext context
) {
565 // Validate the context
566 if (!(context
instanceof TmfExperimentContext
)) {
567 return null; // Throw an exception?
570 if (!context
.equals(fExperimentContext
)) {
571 // Tracer.trace("Ctx: Restoring context");
572 seekLocation(context
.getLocation());
575 TmfExperimentContext expContext
= (TmfExperimentContext
) context
;
577 // If an event was consumed previously, get the next one from that trace
578 int lastTrace
= expContext
.getLastTrace();
579 if (lastTrace
!= TmfExperimentContext
.NO_TRACE
) {
580 TmfContext traceContext
= expContext
.getContexts()[lastTrace
];
581 expContext
.getEvents()[lastTrace
] = expContext
.getTraces()[lastTrace
].getNextEvent(traceContext
);
582 expContext
.setLastTrace(TmfExperimentContext
.NO_TRACE
);
583 fExperimentContext
= (TmfExperimentContext
) context
;
586 // Scan the candidate events and identify the "next" trace to read from
587 int trace
= TmfExperimentContext
.NO_TRACE
;
588 TmfTimestamp timestamp
= TmfTimestamp
.BigCrunch
;
589 for (int i
= 0; i
< expContext
.getTraces().length
; i
++) {
590 TmfEvent event
= expContext
.getEvents()[i
];
591 if (event
!= null && event
.getTimestamp() != null) {
592 TmfTimestamp otherTS
= event
.getTimestamp();
593 if (otherTS
.compareTo(timestamp
, true) < 0) {
600 TmfEvent event
= null;
601 if (trace
!= TmfExperimentContext
.NO_TRACE
) {
602 event
= expContext
.getEvents()[trace
];
609 * @see java.lang.Object#toString()
612 @SuppressWarnings("nls")
613 public String
toString() {
614 return "[TmfExperiment (" + getName() + ")]";
617 // ------------------------------------------------------------------------
619 // ------------------------------------------------------------------------
622 * The experiment holds the globally ordered events of its set of traces.
623 * It is expected to provide access to each individual event by index i.e.
624 * it must be possible to request the Nth event of the experiment.
626 * The purpose of the index is to keep the information needed to rapidly
627 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
631 // The index page size
632 private static final int DEFAULT_INDEX_PAGE_SIZE
= 5000;
633 protected int fIndexPageSize
;
635 // private static BufferedWriter fEventLog = null;
636 // private static BufferedWriter openLogFile(String filename) {
637 // BufferedWriter outfile = null;
639 // outfile = new BufferedWriter(new FileWriter(filename));
640 // } catch (IOException e) {
641 // e.printStackTrace();
646 @SuppressWarnings("unchecked")
647 private void indexExperiment(boolean waitForCompletion
) {
649 fCheckpoints
.clear();
651 // fEventLog = openLogFile("TraceEvent.log");
652 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
654 ITmfEventRequest
<TmfEvent
> request
= new TmfEventRequest
<TmfEvent
>(TmfEvent
.class, TmfTimeRange
.Eternity
,
655 TmfDataRequest
.ALL_DATA
, 1, ITmfDataRequest
.ExecutionType
.BACKGROUND
) {
657 // long indexingStart = System.nanoTime();
659 TmfTimestamp startTime
= null;
660 TmfTimestamp lastTime
= null;
663 public void handleData(TmfEvent event
) {
664 super.handleData(event
);
666 TmfTimestamp ts
= event
.getTimestamp();
667 if (startTime
== null)
668 startTime
= new TmfTimestamp(ts
);
669 lastTime
= new TmfTimestamp(ts
);
671 if ((getNbRead() % fIndexPageSize
) == 0) {
678 public void handleSuccess() {
679 // long indexingEnd = System.nanoTime();
682 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
684 // long average = (indexingEnd - indexingStart) / fNbEvents;
685 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed=" + (indexingEnd * 1.0 - indexingStart) / 1000000000);
686 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "." + (average % 1000) + " us/evt)");
689 private void updateExperiment() {
690 int nbRead
= getNbRead();
692 // updateTimeRange();
694 fTimeRange
= new TmfTimeRange(startTime
, new TmfTimestamp(lastTime
));
701 sendRequest((ITmfDataRequest
<T
>) request
);
702 if (waitForCompletion
)
704 request
.waitForCompletion();
705 } catch (InterruptedException e
) {
710 protected void notifyListeners() {
711 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
714 // ------------------------------------------------------------------------
716 // ------------------------------------------------------------------------
719 public void experimentSelected(TmfExperimentSelectedSignal
<T
> signal
) {
720 TmfExperiment
<?
> experiment
= signal
.getExperiment();
721 if (experiment
== this) {
722 setCurrentExperiment(experiment
);
723 indexExperiment(false);
731 public void experimentUpdated(TmfExperimentUpdatedSignal signal
) {
735 public void traceUpdated(TmfTraceUpdatedSignal signal
) {
736 // TODO: Incremental index update
741 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , signal.getTrace()));
744 // ------------------------------------------------------------------------
746 // ------------------------------------------------------------------------
749 protected void queueBackgroundRequest(final ITmfDataRequest
<T
> request
, final int blockSize
, final boolean indexing
) {
751 // TODO: Handle the data requests also...
752 if (!(request
instanceof ITmfEventRequest
<?
>)) {
753 super.queueRequest(request
);
756 final ITmfEventRequest
<T
> eventRequest
= (ITmfEventRequest
<T
>) request
;
758 Thread thread
= new Thread() {
762 // final long requestStart = System.nanoTime();
764 final Integer
[] CHUNK_SIZE
= new Integer
[1];
765 CHUNK_SIZE
[0] = blockSize
+ ((indexing
) ?
1 : 0);
767 final Integer
[] nbRead
= new Integer
[1];
770 // final TmfTimestamp[] timestamp = new TmfTimestamp[1];
771 // timestamp[0] = new TmfTimestamp(eventRequest.getRange().getStartTime());
772 // final TmfTimestamp endTS = eventRequest.getRange().getEndTime();
774 final Boolean
[] isFinished
= new Boolean
[1];
775 isFinished
[0] = Boolean
.FALSE
;
777 while (!isFinished
[0]) {
779 // TmfEventRequest<T> subRequest = new TmfEventRequest<T>(eventRequest.getDataType(), new TmfTimeRange(timestamp[0], endTS), CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND)
780 // TmfDataRequest<T> subRequest = new TmfDataRequest<T>(eventRequest.getDataType(), nbRead[0], CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND)
781 TmfDataRequest
<T
> subRequest
= new TmfDataRequest
<T
>(eventRequest
.getDataType(), nbRead
[0], CHUNK_SIZE
[0], ExecutionType
.BACKGROUND
)
784 public void handleData(T data
) {
785 super.handleData(data
);
786 eventRequest
.handleData(data
);
787 if (getNbRead() == CHUNK_SIZE
[0]) {
788 nbRead
[0] += getNbRead();
790 if (getNbRead() > CHUNK_SIZE
[0]) {
791 System
.out
.println("ERROR - Read too many events");
796 public void handleCompleted() {
797 // System.out.println("Request completed at: " + timestamp[0]);
798 if (getNbRead() < CHUNK_SIZE
[0]) {
800 eventRequest
.cancel();
805 isFinished
[0] = Boolean
.TRUE
;
806 nbRead
[0] += getNbRead();
807 // System.out.println("fNbRead=" + getNbRead() + " total=" + nbRead[0]);
809 super.handleCompleted();
813 if (!isFinished
[0]) {
814 queueRequest(subRequest
);
817 subRequest
.waitForCompletion();
818 // System.out.println("Finished at " + timestamp[0]);
819 } catch (InterruptedException e
) {
823 // TmfTimestamp newTS = new TmfTimestamp(timestamp[0].getValue() + 1, timestamp[0].getScale(), timestamp[0].getPrecision());
824 // timestamp[0] = newTS;
825 CHUNK_SIZE
[0] = blockSize
;
826 // System.out.println("New timestamp: " + timestamp[0]);
829 // final long requestEnded = System.nanoTime();
830 // System.out.println("Background request completed. Elapsed= " + (requestEnded * 1.0 - requestStart) / 1000000000);