Minor updates
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalManager;
41 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
44 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
46 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
47
48 /**
49 * <b><u>TmfExperiment</u></b>
50 * <p>
51 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
52 * <p>
53 */
54 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
55
56 // ------------------------------------------------------------------------
57 // Attributes
58 // ------------------------------------------------------------------------
59
60 // The currently selected experiment
61 protected static TmfExperiment<?> fCurrentExperiment = null;
62
63 // The set of traces that constitute the experiment
64 protected ITmfTrace<T>[] fTraces;
65
66 // The total number of events
67 protected long fNbEvents;
68
69 // The experiment time range
70 protected TmfTimeRange fTimeRange;
71
72 // The experiment reference timestamp (default: Zero)
73 protected ITmfTimestamp fEpoch;
74
75 // The experiment index
76 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
77
78 // The current experiment context
79 protected TmfExperimentContext fExperimentContext;
80
81 // Flag to initialize only once
82 private boolean fInitialized = false;
83
84 // The experiment bookmarks file
85 private IFile fBookmarksFile;
86
87 // The properties resource
88 private IResource fResource;
89
90 // ------------------------------------------------------------------------
91 // Constructors
92 // ------------------------------------------------------------------------
93
94 @Override
95 public boolean validate(IProject project, String path) {
96 return true;
97 }
98
99 @Override
100 public void initTrace(String name, String path, Class<T> eventType) {
101 }
102
103 @Override
104 public void initTrace(String name, String path, Class<T> eventType, boolean indexTrace) {
105 if (indexTrace) {
106 initializeStreamingMonitor();
107 }
108 }
109
110 @Override
111 public void initTrace(String name, String path, Class<T> eventType, int cacheSize) {
112 }
113
114 @Override
115 public void initTrace(String name, String path, Class<T> eventType, int cacheSize, boolean indexTrace) {
116 if (indexTrace) {
117 initializeStreamingMonitor();
118 }
119 }
120
121 /**
122 * @param type
123 * @param id
124 * @param traces
125 * @param epoch
126 * @param indexPageSize
127 */
128 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize) {
129 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
130 }
131
132 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
133 super(id, type);
134
135 fTraces = traces;
136 fEpoch = epoch;
137 fIndexPageSize = indexPageSize;
138 fTimeRange = TmfTimeRange.NULL_RANGE;
139
140 indexExperiment(true);
141 updateTimeRange();
142 }
143
144 protected TmfExperiment(String id, Class<T> type) {
145 super(id, type);
146 }
147
148 /**
149 * @param type
150 * @param id
151 * @param traces
152 */
153 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
154 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
155 }
156
157 /**
158 * @param type
159 * @param id
160 * @param traces
161 * @param indexPageSize
162 */
163 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
164 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
165 }
166
167 /**
168 * Copy constructor
169 *
170 * @param other
171 */
172 @SuppressWarnings("unchecked")
173 public TmfExperiment(TmfExperiment<T> other) {
174 super(other.getName() + "(copy)", other.fType); //$NON-NLS-1$
175
176 fEpoch = other.fEpoch;
177 fIndexPageSize = other.fIndexPageSize;
178
179 fTraces = new ITmfTrace[other.fTraces.length];
180 for (int trace = 0; trace < other.fTraces.length; trace++) {
181 fTraces[trace] = other.fTraces[trace].copy();
182 }
183
184 fNbEvents = other.fNbEvents;
185 fTimeRange = other.fTimeRange;
186 }
187
188 @Override
189 public TmfExperiment<T> copy() {
190 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
191 TmfSignalManager.deregister(experiment);
192 return experiment;
193 }
194
195 /**
196 * Clears the experiment
197 */
198 @Override
199 @SuppressWarnings("rawtypes")
200 public synchronized void dispose() {
201
202 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
203 broadcast(signal);
204 if (fCurrentExperiment == this) {
205 fCurrentExperiment = null;
206 }
207
208 if (fTraces != null) {
209 for (ITmfTrace trace : fTraces) {
210 trace.dispose();
211 }
212 fTraces = null;
213 }
214 if (fCheckpoints != null) {
215 fCheckpoints.clear();
216 }
217 super.dispose();
218 }
219
220 // ------------------------------------------------------------------------
221 // ITmfTrace
222 // ------------------------------------------------------------------------
223
224 @Override
225 public long getNbEvents() {
226 return fNbEvents;
227 }
228
229 @Override
230 public int getCacheSize() {
231 return fIndexPageSize;
232 }
233
234 @Override
235 public TmfTimeRange getTimeRange() {
236 return fTimeRange;
237 }
238
239 @Override
240 public ITmfTimestamp getStartTime() {
241 return fTimeRange.getStartTime();
242 }
243
244 @Override
245 public ITmfTimestamp getEndTime() {
246 return fTimeRange.getEndTime();
247 }
248
249 public Vector<TmfCheckpoint> getCheckpoints() {
250 return fCheckpoints;
251 }
252
253 // ------------------------------------------------------------------------
254 // Accessors
255 // ------------------------------------------------------------------------
256
257 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
258 if (fCurrentExperiment != null && fCurrentExperiment != experiment) {
259 fCurrentExperiment.dispose();
260 }
261 fCurrentExperiment = experiment;
262 }
263
264 public static TmfExperiment<?> getCurrentExperiment() {
265 return fCurrentExperiment;
266 }
267
268 public ITmfTimestamp getEpoch() {
269 return fEpoch;
270 }
271
272 public ITmfTrace<T>[] getTraces() {
273 return fTraces;
274 }
275
276 /**
277 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
278 * (if any).
279 *
280 * @param timestamp the event timestamp
281 * @return the corresponding event rank
282 */
283 @Override
284 public long getRank(ITmfTimestamp timestamp) {
285 TmfExperimentContext context = seekEvent(timestamp);
286 return context.getRank();
287 }
288
289 /**
290 * Returns the timestamp of the event at the requested index. If none, returns null.
291 *
292 * @param index the event index (rank)
293 * @return the corresponding event timestamp
294 */
295 public ITmfTimestamp getTimestamp(int index) {
296 TmfExperimentContext context = seekEvent(index);
297 ITmfEvent event = getNextEvent(context);
298 return (event != null) ? event.getTimestamp() : null;
299 }
300
301 // ------------------------------------------------------------------------
302 // Operators
303 // ------------------------------------------------------------------------
304
305 /**
306 * Update the global time range
307 */
308 protected void updateTimeRange() {
309 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
310 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
311
312 for (ITmfTrace<T> trace : fTraces) {
313 ITmfTimestamp traceStartTime = trace.getStartTime();
314 if (traceStartTime.compareTo(startTime, true) < 0)
315 startTime = traceStartTime;
316 ITmfTimestamp traceEndTime = trace.getEndTime();
317 if (traceEndTime.compareTo(endTime, true) > 0)
318 endTime = traceEndTime;
319 }
320 fTimeRange = new TmfTimeRange(startTime, endTime);
321 }
322
323 // ------------------------------------------------------------------------
324 // TmfProvider
325 // ------------------------------------------------------------------------
326 @Override
327 public ITmfContext armRequest(ITmfDataRequest<T> request) {
328 // Tracer.trace("Ctx: Arming request - start");
329 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
330 : null;
331
332 if (TmfTimestamp.BIG_BANG.equals(timestamp) || request.getIndex() > 0) {
333 timestamp = null; // use request index
334 }
335
336 TmfExperimentContext context = null;
337 if (timestamp != null) {
338 // seek by timestamp
339 context = seekEvent(timestamp);
340 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
341 } else {
342 // Seek by rank
343 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
344 // We are already at the right context -> no need to seek
345 context = fExperimentContext;
346 } else {
347 context = seekEvent(request.getIndex());
348 }
349 }
350 // Tracer.trace("Ctx: Arming request - done");
351 return context;
352 }
353
354 @SuppressWarnings("unchecked")
355 @Override
356 public T getNext(ITmfContext context) {
357 if (context instanceof TmfExperimentContext) {
358 return (T) getNextEvent((TmfExperimentContext) context);
359 }
360 return null;
361 }
362
363 // ------------------------------------------------------------------------
364 // ITmfTrace trace positioning
365 // ------------------------------------------------------------------------
366
367 // Returns a brand new context based on the location provided
368 // and initializes the event queues
369 @Override
370 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
371 // Validate the location
372 if (location != null && !(location instanceof TmfExperimentLocation)) {
373 return null; // Throw an exception?
374 }
375
376 if (fTraces == null) { // experiment has been disposed
377 return null;
378 }
379
380 // Instantiate the location
381 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
382 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
383
384 // Create and populate the context's traces contexts
385 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
386 // Tracer.trace("Ctx: SeekLocation - start");
387
388 long rank = 0;
389 for (int i = 0; i < fTraces.length; i++) {
390 // Get the relevant trace attributes
391 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
392 long traceRank = expLocation.getRanks()[i];
393
394 // Set the corresponding sub-context
395 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
396 context.getContexts()[i].setRank(traceRank);
397 rank += traceRank;
398
399 // Set the trace location and read the corresponding event
400 /* The (TmfContext) cast should be safe since we created 'context'
401 * ourselves higher up. */
402 expLocation.getLocation().locations[i] = ((TmfContext) context.getContexts()[i]).getLocation().clone();
403 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
404 }
405
406 // Tracer.trace("Ctx: SeekLocation - done");
407
408 // Finalize context
409 context.setLocation(expLocation);
410 context.setLastTrace(TmfExperimentContext.NO_TRACE);
411 context.setRank(rank);
412
413 fExperimentContext = context;
414
415 return context;
416 }
417
418 /*
419 * (non-Javadoc)
420 *
421 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
422 */
423 @Override
424 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
425
426 // Tracer.trace("Ctx: seekEvent(TS) - start");
427
428 if (timestamp == null) {
429 timestamp = TmfTimestamp.BIG_BANG;
430 }
431
432 // First, find the right checkpoint
433 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
434
435 // In the very likely case that the checkpoint was not found, bsearch
436 // returns its negated would-be location (not an offset...). From that
437 // index, we can then position the stream and get the event.
438 if (index < 0) {
439 index = Math.max(0, -(index + 2));
440 }
441
442 // Position the experiment at the checkpoint
443 ITmfLocation<?> location;
444 synchronized (fCheckpoints) {
445 if (fCheckpoints.size() > 0) {
446 if (index >= fCheckpoints.size()) {
447 index = fCheckpoints.size() - 1;
448 }
449 location = fCheckpoints.elementAt(index).getLocation();
450 } else {
451 location = null;
452 }
453 }
454
455 TmfExperimentContext context = seekLocation(location);
456 context.setRank((long) index * fIndexPageSize);
457
458 // And locate the event
459 ITmfEvent event = parseEvent(context);
460 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
461 getNextEvent(context);
462 event = parseEvent(context);
463 }
464
465 if (event == null) {
466 context.setLocation(null);
467 context.setRank(ITmfContext.UNKNOWN_RANK);
468 }
469
470 return context;
471 }
472
473 /*
474 * (non-Javadoc)
475 *
476 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
477 */
478 @Override
479 public synchronized TmfExperimentContext seekEvent(long rank) {
480
481 // Tracer.trace("Ctx: seekEvent(rank) - start");
482
483 // Position the stream at the previous checkpoint
484 int index = (int) rank / fIndexPageSize;
485 ITmfLocation<?> location;
486 synchronized (fCheckpoints) {
487 if (fCheckpoints.size() == 0) {
488 location = null;
489 } else {
490 if (index >= fCheckpoints.size()) {
491 index = fCheckpoints.size() - 1;
492 }
493 location = fCheckpoints.elementAt(index).getLocation();
494 }
495 }
496
497 TmfExperimentContext context = seekLocation(location);
498 context.setRank((long) index * fIndexPageSize);
499
500 // And locate the event
501 ITmfEvent event = parseEvent(context);
502 long pos = context.getRank();
503 while (event != null && pos++ < rank) {
504 getNextEvent(context);
505 event = parseEvent(context);
506 }
507
508 if (event == null) {
509 context.setLocation(null);
510 context.setRank(ITmfContext.UNKNOWN_RANK);
511 }
512
513 return context;
514 }
515
516 @Override
517 public TmfContext seekLocation(double ratio) {
518 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
519 return context;
520 }
521
522 @Override
523 public double getLocationRatio(ITmfLocation<?> location) {
524 if (location instanceof TmfExperimentLocation) {
525 return (double) seekLocation(location).getRank() / getNbEvents();
526 }
527 return 0;
528 }
529
530 @Override
531 public ITmfLocation<?> getCurrentLocation() {
532 if (fExperimentContext != null) {
533 return fExperimentContext.getLocation();
534 }
535 return null;
536 }
537
538 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
539
540 // TmfContext context0 = context.getContexts()[0];
541 // TmfEvent event0 = context.getEvents()[0];
542 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
543 // long rank0 = context.getRank();
544 // int trace = context.getLastTrace();
545 //
546 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
547 //
548 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
549 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
550 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
551 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
552 // Tracer.trace(result.toString());
553 // }
554
555 /**
556 * Scan the next events from all traces and return the next one in chronological order.
557 *
558 * @param context the trace context
559 * @return the next event
560 */
561 @SuppressWarnings("unchecked")
562 @Override
563 public synchronized ITmfEvent getNextEvent(ITmfContext context) {
564
565 // Validate the context
566 if (!(context instanceof TmfExperimentContext)) {
567 return null; // Throw an exception?
568 }
569
570 if (!context.equals(fExperimentContext)) {
571 // Tracer.trace("Ctx: Restoring context");
572 fExperimentContext = seekLocation(context.getLocation());
573 }
574
575 TmfExperimentContext expContext = (TmfExperimentContext) context;
576
577 // dumpContext(expContext, true);
578
579 // If an event was consumed previously, get the next one from that trace
580 int lastTrace = expContext.getLastTrace();
581 if (lastTrace != TmfExperimentContext.NO_TRACE) {
582 ITmfContext traceContext = expContext.getContexts()[lastTrace];
583 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
584 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
585 }
586
587 // Scan the candidate events and identify the "next" trace to read from
588 ITmfEvent eventArray[] = expContext.getEvents();
589 if (eventArray == null) {
590 return null;
591 }
592 int trace = TmfExperimentContext.NO_TRACE;
593 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
594 if (eventArray.length == 1) {
595 if (eventArray[0] != null) {
596 timestamp = eventArray[0].getTimestamp();
597 trace = 0;
598 }
599 } else {
600 for (int i = 0; i < eventArray.length; i++) {
601 ITmfEvent event = eventArray[i];
602 if (event != null && event.getTimestamp() != null) {
603 ITmfTimestamp otherTS = event.getTimestamp();
604 if (otherTS.compareTo(timestamp, true) < 0) {
605 trace = i;
606 timestamp = otherTS;
607 }
608 }
609 }
610 }
611 // Update the experiment context and set the "next" event
612 ITmfEvent event = null;
613 if (trace != TmfExperimentContext.NO_TRACE) {
614 updateIndex(expContext, timestamp);
615
616 ITmfContext traceContext = expContext.getContexts()[trace];
617 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
618 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
619 expLocation.getLocation().locations[trace] = (ITmfLocation<? extends Comparable<?>>) traceContext.getLocation().clone();
620
621 // updateIndex(expContext, timestamp);
622
623 expLocation.getRanks()[trace] = traceContext.getRank();
624 expContext.setLastTrace(trace);
625 expContext.updateRank(1);
626 event = expContext.getEvents()[trace];
627 fExperimentContext = expContext;
628 }
629
630 // if (event != null) {
631 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
632 // dumpContext(expContext, false);
633 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
634 // }
635
636 return event;
637 }
638
639 public synchronized void updateIndex(ITmfContext context, ITmfTimestamp timestamp) {
640 // Build the index as we go along
641 long rank = context.getRank();
642 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
643 // Determine the table position
644 long position = rank / fIndexPageSize;
645 // Add new entry at proper location (if empty)
646 if (fCheckpoints.size() == position) {
647 ITmfLocation<?> location = context.getLocation().clone();
648 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
649 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
650 // + location.toString());
651 }
652 }
653 }
654
655 /*
656 * (non-Javadoc)
657 *
658 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
659 */
660 @Override
661 public ITmfEvent parseEvent(ITmfContext context) {
662
663 // Validate the context
664 if (!(context instanceof TmfExperimentContext)) {
665 return null; // Throw an exception?
666 }
667
668 if (!context.equals(fExperimentContext)) {
669 // Tracer.trace("Ctx: Restoring context");
670 seekLocation(context.getLocation());
671 }
672
673 TmfExperimentContext expContext = (TmfExperimentContext) context;
674
675 // If an event was consumed previously, get the next one from that trace
676 int lastTrace = expContext.getLastTrace();
677 if (lastTrace != TmfExperimentContext.NO_TRACE) {
678 ITmfContext traceContext = expContext.getContexts()[lastTrace];
679 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
680 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
681 fExperimentContext = (TmfExperimentContext) context;
682 }
683
684 // Scan the candidate events and identify the "next" trace to read from
685 int trace = TmfExperimentContext.NO_TRACE;
686 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
687 for (int i = 0; i < expContext.getTraces().length; i++) {
688 ITmfEvent event = expContext.getEvents()[i];
689 if (event != null && event.getTimestamp() != null) {
690 ITmfTimestamp otherTS = event.getTimestamp();
691 if (otherTS.compareTo(timestamp, true) < 0) {
692 trace = i;
693 timestamp = otherTS;
694 }
695 }
696 }
697
698 ITmfEvent event = null;
699 if (trace != TmfExperimentContext.NO_TRACE) {
700 event = expContext.getEvents()[trace];
701 }
702
703 return event;
704 }
705
706 /*
707 * (non-Javadoc)
708 *
709 * @see java.lang.Object#toString()
710 */
711 @Override
712 @SuppressWarnings("nls")
713 public String toString() {
714 return "[TmfExperiment (" + getName() + ")]";
715 }
716
717 // ------------------------------------------------------------------------
718 // Indexing
719 // ------------------------------------------------------------------------
720
721 private synchronized void initializeStreamingMonitor() {
722 if (fInitialized) {
723 return;
724 }
725 fInitialized = true;
726
727 if (getStreamingInterval() == 0) {
728 TmfContext context = seekLocation(null);
729 ITmfEvent event = getNext(context);
730 if (event == null) {
731 return;
732 }
733 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
734 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
735
736 // Broadcast in separate thread to prevent deadlock
737 new Thread() {
738 @Override
739 public void run() {
740 broadcast(signal);
741 }
742 }.start();
743 return;
744 }
745
746 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
747 ITmfTimestamp safeTimestamp = null;
748 TmfTimeRange timeRange = null;
749
750 @Override
751 public void run() {
752 while (!fExecutor.isShutdown()) {
753 if (!isIndexingBusy()) {
754 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
755 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
756 for (ITmfTrace<T> trace : fTraces) {
757 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
758 startTimestamp = trace.getStartTime();
759 }
760 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
761 endTimestamp = trace.getEndTime();
762 }
763 }
764 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0) {
765 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
766 } else {
767 timeRange = null;
768 }
769 safeTimestamp = endTimestamp;
770 if (timeRange != null) {
771 TmfExperimentRangeUpdatedSignal signal =
772 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
773 broadcast(signal);
774 }
775 }
776 try {
777 Thread.sleep(getStreamingInterval());
778 } catch (InterruptedException e) {
779 e.printStackTrace();
780 }
781 }
782 }
783 };
784 thread.start();
785 }
786
787 /* (non-Javadoc)
788 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
789 */
790 @Override
791 public long getStreamingInterval() {
792 long interval = 0;
793 for (ITmfTrace<T> trace : fTraces) {
794 interval = Math.max(interval, trace.getStreamingInterval());
795 }
796 return interval;
797 }
798
799 /*
800 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
801 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
802 *
803 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
804 * intervals (every INDEX_PAGE_SIZE event).
805 */
806
807 // The index page size
808 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
809 protected int fIndexPageSize;
810 protected boolean fIndexing = false;
811 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
812
813 private Integer fEndSynchReference;
814
815 // private static BufferedWriter fEventLog = null;
816 // private static BufferedWriter openLogFile(String filename) {
817 // BufferedWriter outfile = null;
818 // try {
819 // outfile = new BufferedWriter(new FileWriter(filename));
820 // } catch (IOException e) {
821 // e.printStackTrace();
822 // }
823 // return outfile;
824 // }
825
826 protected boolean isIndexingBusy() {
827 synchronized (fCheckpoints) {
828 return fIndexing;
829 }
830 }
831
832 protected void indexExperiment(boolean waitForCompletion) {
833 indexExperiment(waitForCompletion, 0, TmfTimeRange.ETERNITY);
834 }
835
836 @SuppressWarnings("unchecked")
837 protected void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
838
839 synchronized (fCheckpoints) {
840 if (fIndexing) {
841 return;
842 }
843 fIndexing = true;
844 }
845
846 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
847 @Override
848 protected IStatus run(IProgressMonitor monitor) {
849 while (!monitor.isCanceled()) {
850 try {
851 Thread.sleep(100);
852 } catch (InterruptedException e) {
853 return Status.OK_STATUS;
854 }
855 }
856 monitor.done();
857 return Status.OK_STATUS;
858 }
859 };
860 job.schedule();
861
862 // fEventLog = openLogFile("TraceEvent.log");
863 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
864
865 ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
866 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
867
868 // long indexingStart = System.nanoTime();
869
870 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
871 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
872 long initialNbEvents = fNbEvents;
873
874 @Override
875 public void handleStarted() {
876 super.handleStarted();
877 }
878
879 @Override
880 public void handleData(ITmfEvent event) {
881 super.handleData(event);
882 if (event != null) {
883 ITmfTimestamp ts = event.getTimestamp();
884 if (startTime == null)
885 startTime = ts.clone();
886 lastTime = ts.clone();
887 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1) {
888 updateExperiment();
889 }
890 }
891 }
892
893 @Override
894 public void handleSuccess() {
895 // long indexingEnd = System.nanoTime();
896
897 // if the end time is a real value then it is the streaming safe time stamp
898 // set the last time to the safe time stamp to prevent unnecessary indexing requests
899 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH) {
900 lastTime = getRange().getEndTime();
901 }
902 updateExperiment();
903 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
904
905 // long average = (indexingEnd - indexingStart) / fNbEvents;
906 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
907 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
908 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
909 // + (average % 1000) + " us/evt)");
910 super.handleSuccess();
911 }
912
913 @Override
914 public void handleCompleted() {
915 job.cancel();
916 super.handleCompleted();
917 synchronized (fCheckpoints) {
918 fIndexing = false;
919 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
920 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
921 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
922 }
923 }
924 }
925
926 private void updateExperiment() {
927 int nbRead = getNbRead();
928 if (startTime != null) {
929 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
930 }
931 if (nbRead != 0) {
932 // updateTimeRange();
933 // updateNbEvents();
934 fNbEvents = initialNbEvents + nbRead;
935 notifyListeners();
936 }
937 }
938 };
939
940 sendRequest((ITmfDataRequest<T>) request);
941 if (waitForCompletion)
942 try {
943 request.waitForCompletion();
944 } catch (InterruptedException e) {
945 e.printStackTrace();
946 }
947 }
948
949 protected void notifyListeners() {
950 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
951 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
952 }
953
954 // ------------------------------------------------------------------------
955 // Signal handlers
956 // ------------------------------------------------------------------------
957
958 @TmfSignalHandler
959 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
960 TmfExperiment<?> experiment = signal.getExperiment();
961 if (experiment == this) {
962 setCurrentExperiment(experiment);
963 fEndSynchReference = Integer.valueOf(signal.getReference());
964 }
965 }
966
967 @TmfSignalHandler
968 public void endSync(TmfEndSynchSignal signal) {
969 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
970 fEndSynchReference = null;
971 initializeStreamingMonitor();
972 }
973
974 }
975
976 @TmfSignalHandler
977 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
978 }
979
980 @TmfSignalHandler
981 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
982 if (signal.getExperiment() == this) {
983 indexExperiment(false, (int) fNbEvents, signal.getRange());
984 }
985 }
986
987 @TmfSignalHandler
988 public void traceUpdated(TmfTraceUpdatedSignal signal) {
989 for (ITmfTrace<T> trace : fTraces) {
990 if (trace == signal.getTrace()) {
991 synchronized (fCheckpoints) {
992 if (fIndexing) {
993 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE) {
994 fIndexingPendingRange = signal.getRange();
995 } else {
996 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
997 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
998 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
999 startTime = signal.getRange().getStartTime();
1000 }
1001 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
1002 endTime = signal.getRange().getEndTime();
1003 }
1004 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
1005 }
1006 return;
1007 }
1008 }
1009 indexExperiment(false, (int) fNbEvents, signal.getRange());
1010 return;
1011 }
1012 }
1013 }
1014
1015 @Override
1016 public String getPath() {
1017 // TODO Auto-generated method stub
1018 return null;
1019 }
1020
1021 /**
1022 * Set the file to be used for bookmarks on this experiment
1023 * @param file the bookmarks file
1024 */
1025 public void setBookmarksFile(IFile file) {
1026 fBookmarksFile = file;
1027 }
1028
1029 /**
1030 * Get the file used for bookmarks on this experiment
1031 * @return the bookmarks file or null if none is set
1032 */
1033 public IFile getBookmarksFile() {
1034 return fBookmarksFile;
1035 }
1036
1037 /* (non-Javadoc)
1038 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#setResource(org.eclipse.core.resources.IResource)
1039 */
1040 @Override
1041 public void setResource(IResource resource) {
1042 fResource = resource;
1043 }
1044
1045 /* (non-Javadoc)
1046 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
1047 */
1048 @Override
1049 public IResource getResource() {
1050 return fResource;
1051 }
1052 }
This page took 0.054364 seconds and 6 git commands to generate.