8b493dc81439aee745a05b7a295107f50d50ae8b
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalManager;
41 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
44 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
46 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
47
48 /**
49 * <b><u>TmfExperiment</u></b>
50 * <p>
51 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
52 * <p>
53 */
54 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
55
56 // ------------------------------------------------------------------------
57 // Attributes
58 // ------------------------------------------------------------------------
59
60 // The currently selected experiment
61 protected static TmfExperiment<?> fCurrentExperiment = null;
62
63 // The set of traces that constitute the experiment
64 protected ITmfTrace<T>[] fTraces;
65
66 // The total number of events
67 protected long fNbEvents;
68
69 // The experiment time range
70 protected TmfTimeRange fTimeRange;
71
72 // The experiment reference timestamp (default: Zero)
73 protected ITmfTimestamp fEpoch;
74
75 // The experiment index
76 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
77
78 // The current experiment context
79 protected TmfExperimentContext fExperimentContext;
80
81 // Flag to initialize only once
82 private boolean fInitialized = false;
83
84 // The experiment bookmarks file
85 private IFile fBookmarksFile;
86
87 // The properties resource
88 private IResource fResource;
89
90 // ------------------------------------------------------------------------
91 // Constructors
92 // ------------------------------------------------------------------------
93
94 @Override
95 public boolean validate(IProject project, String path) {
96 return true;
97 }
98
99 @Override
100 public void initTrace(String name, String path, Class<T> eventType) {
101 }
102
103 @Override
104 public void initTrace(String name, String path, Class<T> eventType, boolean indexTrace) {
105 if (indexTrace) {
106 initializeStreamingMonitor();
107 }
108 }
109
110 @Override
111 public void initTrace(String name, String path, Class<T> eventType, int cacheSize) {
112 }
113
114 @Override
115 public void initTrace(String name, String path, Class<T> eventType, int cacheSize, boolean indexTrace) {
116 if (indexTrace) {
117 initializeStreamingMonitor();
118 }
119 }
120
121 /**
122 * @param type
123 * @param id
124 * @param traces
125 * @param epoch
126 * @param indexPageSize
127 */
128 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize) {
129 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
130 }
131
132 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
133 super(id, type);
134
135 fTraces = traces;
136 fEpoch = epoch;
137 fIndexPageSize = indexPageSize;
138 fTimeRange = TmfTimeRange.NULL_RANGE;
139
140 if (preIndexExperiment) {
141 indexExperiment(true);
142 updateTimeRange();
143 }
144 }
145
146 protected TmfExperiment(String id, Class<T> type) {
147 super(id, type);
148 }
149
150 /**
151 * @param type
152 * @param id
153 * @param traces
154 */
155 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
156 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
157 }
158
159 /**
160 * @param type
161 * @param id
162 * @param traces
163 * @param indexPageSize
164 */
165 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
166 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
167 }
168
169 /**
170 * Copy constructor
171 *
172 * @param other
173 */
174 @SuppressWarnings("unchecked")
175 public TmfExperiment(TmfExperiment<T> other) {
176 super(other.getName() + "(copy)", other.fType); //$NON-NLS-1$
177
178 fEpoch = other.fEpoch;
179 fIndexPageSize = other.fIndexPageSize;
180
181 fTraces = new ITmfTrace[other.fTraces.length];
182 for (int trace = 0; trace < other.fTraces.length; trace++) {
183 fTraces[trace] = other.fTraces[trace].copy();
184 }
185
186 fNbEvents = other.fNbEvents;
187 fTimeRange = other.fTimeRange;
188 }
189
190 @Override
191 public TmfExperiment<T> copy() {
192 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
193 TmfSignalManager.deregister(experiment);
194 return experiment;
195 }
196
197 /**
198 * Clears the experiment
199 */
200 @Override
201 @SuppressWarnings("rawtypes")
202 public synchronized void dispose() {
203
204 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
205 broadcast(signal);
206 if (fCurrentExperiment == this) {
207 fCurrentExperiment = null;
208 }
209
210 if (fTraces != null) {
211 for (ITmfTrace trace : fTraces) {
212 trace.dispose();
213 }
214 fTraces = null;
215 }
216 if (fCheckpoints != null) {
217 fCheckpoints.clear();
218 }
219 super.dispose();
220 }
221
222 // ------------------------------------------------------------------------
223 // ITmfTrace
224 // ------------------------------------------------------------------------
225
226 @Override
227 public long getNbEvents() {
228 return fNbEvents;
229 }
230
231 @Override
232 public int getCacheSize() {
233 return fIndexPageSize;
234 }
235
236 @Override
237 public TmfTimeRange getTimeRange() {
238 return fTimeRange;
239 }
240
241 @Override
242 public ITmfTimestamp getStartTime() {
243 return fTimeRange.getStartTime();
244 }
245
246 @Override
247 public ITmfTimestamp getEndTime() {
248 return fTimeRange.getEndTime();
249 }
250
251 public Vector<TmfCheckpoint> getCheckpoints() {
252 return fCheckpoints;
253 }
254
255 // ------------------------------------------------------------------------
256 // Accessors
257 // ------------------------------------------------------------------------
258
259 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
260 if (fCurrentExperiment != null && fCurrentExperiment != experiment) {
261 fCurrentExperiment.dispose();
262 }
263 fCurrentExperiment = experiment;
264 }
265
266 public static TmfExperiment<?> getCurrentExperiment() {
267 return fCurrentExperiment;
268 }
269
270 public ITmfTimestamp getEpoch() {
271 return fEpoch;
272 }
273
274 public ITmfTrace<T>[] getTraces() {
275 return fTraces;
276 }
277
278 /**
279 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
280 * (if any).
281 *
282 * @param timestamp the event timestamp
283 * @return the corresponding event rank
284 */
285 @Override
286 public long getRank(ITmfTimestamp timestamp) {
287 TmfExperimentContext context = seekEvent(timestamp);
288 return context.getRank();
289 }
290
291 /**
292 * Returns the timestamp of the event at the requested index. If none, returns null.
293 *
294 * @param index the event index (rank)
295 * @return the corresponding event timestamp
296 */
297 public ITmfTimestamp getTimestamp(int index) {
298 TmfExperimentContext context = seekEvent(index);
299 ITmfEvent event = getNextEvent(context);
300 return (event != null) ? event.getTimestamp() : null;
301 }
302
303 // ------------------------------------------------------------------------
304 // Operators
305 // ------------------------------------------------------------------------
306
307 /**
308 * Update the global time range
309 */
310 protected void updateTimeRange() {
311 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
312 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
313
314 for (ITmfTrace<T> trace : fTraces) {
315 ITmfTimestamp traceStartTime = trace.getStartTime();
316 if (traceStartTime.compareTo(startTime, true) < 0)
317 startTime = traceStartTime;
318 ITmfTimestamp traceEndTime = trace.getEndTime();
319 if (traceEndTime.compareTo(endTime, true) > 0)
320 endTime = traceEndTime;
321 }
322 fTimeRange = new TmfTimeRange(startTime, endTime);
323 }
324
325 // ------------------------------------------------------------------------
326 // TmfProvider
327 // ------------------------------------------------------------------------
328 @Override
329 public ITmfContext armRequest(ITmfDataRequest<T> request) {
330 // Tracer.trace("Ctx: Arming request - start");
331 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
332 : null;
333
334 if (TmfTimestamp.BIG_BANG.equals(timestamp) || request.getIndex() > 0) {
335 timestamp = null; // use request index
336 }
337
338 TmfExperimentContext context = null;
339 if (timestamp != null) {
340 // seek by timestamp
341 context = seekEvent(timestamp);
342 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
343 } else {
344 // Seek by rank
345 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
346 // We are already at the right context -> no need to seek
347 context = fExperimentContext;
348 } else {
349 context = seekEvent(request.getIndex());
350 }
351 }
352 // Tracer.trace("Ctx: Arming request - done");
353 return context;
354 }
355
356 @SuppressWarnings("unchecked")
357 @Override
358 public T getNext(ITmfContext context) {
359 if (context instanceof TmfExperimentContext) {
360 return (T) getNextEvent((TmfExperimentContext) context);
361 }
362 return null;
363 }
364
365 // ------------------------------------------------------------------------
366 // ITmfTrace trace positioning
367 // ------------------------------------------------------------------------
368
369 // Returns a brand new context based on the location provided
370 // and initializes the event queues
371 @Override
372 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
373 // Validate the location
374 if (location != null && !(location instanceof TmfExperimentLocation)) {
375 return null; // Throw an exception?
376 }
377
378 if (fTraces == null) { // experiment has been disposed
379 return null;
380 }
381
382 // Instantiate the location
383 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
384 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
385
386 // Create and populate the context's traces contexts
387 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
388 // Tracer.trace("Ctx: SeekLocation - start");
389
390 long rank = 0;
391 for (int i = 0; i < fTraces.length; i++) {
392 // Get the relevant trace attributes
393 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
394 long traceRank = expLocation.getRanks()[i];
395
396 // Set the corresponding sub-context
397 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
398 context.getContexts()[i].setRank(traceRank);
399 rank += traceRank;
400
401 // Set the trace location and read the corresponding event
402 /* The (TmfContext) cast should be safe since we created 'context'
403 * ourselves higher up. */
404 expLocation.getLocation().locations[i] = ((TmfContext) context.getContexts()[i]).getLocation().clone();
405 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
406 }
407
408 // Tracer.trace("Ctx: SeekLocation - done");
409
410 // Finalize context
411 context.setLocation(expLocation);
412 context.setLastTrace(TmfExperimentContext.NO_TRACE);
413 context.setRank(rank);
414
415 fExperimentContext = context;
416
417 return context;
418 }
419
420 /*
421 * (non-Javadoc)
422 *
423 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
424 */
425 @Override
426 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
427
428 // Tracer.trace("Ctx: seekEvent(TS) - start");
429
430 if (timestamp == null) {
431 timestamp = TmfTimestamp.BIG_BANG;
432 }
433
434 // First, find the right checkpoint
435 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
436
437 // In the very likely case that the checkpoint was not found, bsearch
438 // returns its negated would-be location (not an offset...). From that
439 // index, we can then position the stream and get the event.
440 if (index < 0) {
441 index = Math.max(0, -(index + 2));
442 }
443
444 // Position the experiment at the checkpoint
445 ITmfLocation<?> location;
446 synchronized (fCheckpoints) {
447 if (fCheckpoints.size() > 0) {
448 if (index >= fCheckpoints.size()) {
449 index = fCheckpoints.size() - 1;
450 }
451 location = fCheckpoints.elementAt(index).getLocation();
452 } else {
453 location = null;
454 }
455 }
456
457 TmfExperimentContext context = seekLocation(location);
458 context.setRank((long) index * fIndexPageSize);
459
460 // And locate the event
461 ITmfEvent event = parseEvent(context);
462 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
463 getNextEvent(context);
464 event = parseEvent(context);
465 }
466
467 if (event == null) {
468 context.setLocation(null);
469 context.setRank(ITmfContext.UNKNOWN_RANK);
470 }
471
472 return context;
473 }
474
475 /*
476 * (non-Javadoc)
477 *
478 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
479 */
480 @Override
481 public synchronized TmfExperimentContext seekEvent(long rank) {
482
483 // Tracer.trace("Ctx: seekEvent(rank) - start");
484
485 // Position the stream at the previous checkpoint
486 int index = (int) rank / fIndexPageSize;
487 ITmfLocation<?> location;
488 synchronized (fCheckpoints) {
489 if (fCheckpoints.size() == 0) {
490 location = null;
491 } else {
492 if (index >= fCheckpoints.size()) {
493 index = fCheckpoints.size() - 1;
494 }
495 location = fCheckpoints.elementAt(index).getLocation();
496 }
497 }
498
499 TmfExperimentContext context = seekLocation(location);
500 context.setRank((long) index * fIndexPageSize);
501
502 // And locate the event
503 ITmfEvent event = parseEvent(context);
504 long pos = context.getRank();
505 while (event != null && pos++ < rank) {
506 getNextEvent(context);
507 event = parseEvent(context);
508 }
509
510 if (event == null) {
511 context.setLocation(null);
512 context.setRank(ITmfContext.UNKNOWN_RANK);
513 }
514
515 return context;
516 }
517
518 @Override
519 public TmfContext seekLocation(double ratio) {
520 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
521 return context;
522 }
523
524 @Override
525 public double getLocationRatio(ITmfLocation<?> location) {
526 if (location instanceof TmfExperimentLocation) {
527 return (double) seekLocation(location).getRank() / getNbEvents();
528 }
529 return 0;
530 }
531
532 @Override
533 public ITmfLocation<?> getCurrentLocation() {
534 if (fExperimentContext != null) {
535 return fExperimentContext.getLocation();
536 }
537 return null;
538 }
539
540 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
541
542 // TmfContext context0 = context.getContexts()[0];
543 // TmfEvent event0 = context.getEvents()[0];
544 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
545 // long rank0 = context.getRank();
546 // int trace = context.getLastTrace();
547 //
548 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
549 //
550 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
551 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
552 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
553 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
554 // Tracer.trace(result.toString());
555 // }
556
557 /**
558 * Scan the next events from all traces and return the next one in chronological order.
559 *
560 * @param context the trace context
561 * @return the next event
562 */
563 @SuppressWarnings("unchecked")
564 @Override
565 public synchronized ITmfEvent getNextEvent(ITmfContext context) {
566
567 // Validate the context
568 if (!(context instanceof TmfExperimentContext)) {
569 return null; // Throw an exception?
570 }
571
572 if (!context.equals(fExperimentContext)) {
573 // Tracer.trace("Ctx: Restoring context");
574 fExperimentContext = seekLocation(context.getLocation());
575 }
576
577 TmfExperimentContext expContext = (TmfExperimentContext) context;
578
579 // dumpContext(expContext, true);
580
581 // If an event was consumed previously, get the next one from that trace
582 int lastTrace = expContext.getLastTrace();
583 if (lastTrace != TmfExperimentContext.NO_TRACE) {
584 ITmfContext traceContext = expContext.getContexts()[lastTrace];
585 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
586 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
587 }
588
589 // Scan the candidate events and identify the "next" trace to read from
590 ITmfEvent eventArray[] = expContext.getEvents();
591 if (eventArray == null) {
592 return null;
593 }
594 int trace = TmfExperimentContext.NO_TRACE;
595 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
596 if (eventArray.length == 1) {
597 if (eventArray[0] != null) {
598 timestamp = eventArray[0].getTimestamp();
599 trace = 0;
600 }
601 } else {
602 for (int i = 0; i < eventArray.length; i++) {
603 ITmfEvent event = eventArray[i];
604 if (event != null && event.getTimestamp() != null) {
605 ITmfTimestamp otherTS = event.getTimestamp();
606 if (otherTS.compareTo(timestamp, true) < 0) {
607 trace = i;
608 timestamp = otherTS;
609 }
610 }
611 }
612 }
613 // Update the experiment context and set the "next" event
614 ITmfEvent event = null;
615 if (trace != TmfExperimentContext.NO_TRACE) {
616 updateIndex(expContext, timestamp);
617
618 ITmfContext traceContext = expContext.getContexts()[trace];
619 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
620 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
621 expLocation.getLocation().locations[trace] = (ITmfLocation<? extends Comparable<?>>) traceContext.getLocation().clone();
622
623 // updateIndex(expContext, timestamp);
624
625 expLocation.getRanks()[trace] = traceContext.getRank();
626 expContext.setLastTrace(trace);
627 expContext.updateRank(1);
628 event = expContext.getEvents()[trace];
629 fExperimentContext = expContext;
630 }
631
632 // if (event != null) {
633 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
634 // dumpContext(expContext, false);
635 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
636 // }
637
638 return event;
639 }
640
641 public synchronized void updateIndex(ITmfContext context, ITmfTimestamp timestamp) {
642 // Build the index as we go along
643 long rank = context.getRank();
644 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
645 // Determine the table position
646 long position = rank / fIndexPageSize;
647 // Add new entry at proper location (if empty)
648 if (fCheckpoints.size() == position) {
649 ITmfLocation<?> location = context.getLocation().clone();
650 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
651 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
652 // + location.toString());
653 }
654 }
655 }
656
657 /*
658 * (non-Javadoc)
659 *
660 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
661 */
662 @Override
663 public ITmfEvent parseEvent(ITmfContext context) {
664
665 // Validate the context
666 if (!(context instanceof TmfExperimentContext)) {
667 return null; // Throw an exception?
668 }
669
670 if (!context.equals(fExperimentContext)) {
671 // Tracer.trace("Ctx: Restoring context");
672 seekLocation(context.getLocation());
673 }
674
675 TmfExperimentContext expContext = (TmfExperimentContext) context;
676
677 // If an event was consumed previously, get the next one from that trace
678 int lastTrace = expContext.getLastTrace();
679 if (lastTrace != TmfExperimentContext.NO_TRACE) {
680 ITmfContext traceContext = expContext.getContexts()[lastTrace];
681 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
682 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
683 fExperimentContext = (TmfExperimentContext) context;
684 }
685
686 // Scan the candidate events and identify the "next" trace to read from
687 int trace = TmfExperimentContext.NO_TRACE;
688 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
689 for (int i = 0; i < expContext.getTraces().length; i++) {
690 ITmfEvent event = expContext.getEvents()[i];
691 if (event != null && event.getTimestamp() != null) {
692 ITmfTimestamp otherTS = event.getTimestamp();
693 if (otherTS.compareTo(timestamp, true) < 0) {
694 trace = i;
695 timestamp = otherTS;
696 }
697 }
698 }
699
700 ITmfEvent event = null;
701 if (trace != TmfExperimentContext.NO_TRACE) {
702 event = expContext.getEvents()[trace];
703 }
704
705 return event;
706 }
707
708 /*
709 * (non-Javadoc)
710 *
711 * @see java.lang.Object#toString()
712 */
713 @Override
714 @SuppressWarnings("nls")
715 public String toString() {
716 return "[TmfExperiment (" + getName() + ")]";
717 }
718
719 // ------------------------------------------------------------------------
720 // Indexing
721 // ------------------------------------------------------------------------
722
723 private synchronized void initializeStreamingMonitor() {
724 if (fInitialized) {
725 return;
726 }
727 fInitialized = true;
728
729 if (getStreamingInterval() == 0) {
730 TmfContext context = seekLocation(null);
731 ITmfEvent event = getNext(context);
732 if (event == null) {
733 return;
734 }
735 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
736 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
737
738 // Broadcast in separate thread to prevent deadlock
739 new Thread() {
740 @Override
741 public void run() {
742 broadcast(signal);
743 }
744 }.start();
745 return;
746 }
747
748 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
749 ITmfTimestamp safeTimestamp = null;
750 TmfTimeRange timeRange = null;
751
752 @Override
753 public void run() {
754 while (!fExecutor.isShutdown()) {
755 if (!isIndexingBusy()) {
756 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
757 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
758 for (ITmfTrace<T> trace : fTraces) {
759 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
760 startTimestamp = trace.getStartTime();
761 }
762 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
763 endTimestamp = trace.getEndTime();
764 }
765 }
766 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0) {
767 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
768 } else {
769 timeRange = null;
770 }
771 safeTimestamp = endTimestamp;
772 if (timeRange != null) {
773 TmfExperimentRangeUpdatedSignal signal =
774 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
775 broadcast(signal);
776 }
777 }
778 try {
779 Thread.sleep(getStreamingInterval());
780 } catch (InterruptedException e) {
781 e.printStackTrace();
782 }
783 }
784 }
785 };
786 thread.start();
787 }
788
789 /* (non-Javadoc)
790 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
791 */
792 @Override
793 public long getStreamingInterval() {
794 long interval = 0;
795 for (ITmfTrace<T> trace : fTraces) {
796 interval = Math.max(interval, trace.getStreamingInterval());
797 }
798 return interval;
799 }
800
801 /*
802 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
803 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
804 *
805 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
806 * intervals (every INDEX_PAGE_SIZE event).
807 */
808
809 // The index page size
810 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
811 protected int fIndexPageSize;
812 protected boolean fIndexing = false;
813 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
814
815 private Integer fEndSynchReference;
816
817 // private static BufferedWriter fEventLog = null;
818 // private static BufferedWriter openLogFile(String filename) {
819 // BufferedWriter outfile = null;
820 // try {
821 // outfile = new BufferedWriter(new FileWriter(filename));
822 // } catch (IOException e) {
823 // e.printStackTrace();
824 // }
825 // return outfile;
826 // }
827
828 protected boolean isIndexingBusy() {
829 synchronized (fCheckpoints) {
830 return fIndexing;
831 }
832 }
833
834 protected void indexExperiment(boolean waitForCompletion) {
835 indexExperiment(waitForCompletion, 0, TmfTimeRange.ETERNITY);
836 }
837
838 @SuppressWarnings("unchecked")
839 protected void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
840
841 synchronized (fCheckpoints) {
842 if (fIndexing) {
843 return;
844 }
845 fIndexing = true;
846 }
847
848 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
849 @Override
850 protected IStatus run(IProgressMonitor monitor) {
851 while (!monitor.isCanceled()) {
852 try {
853 Thread.sleep(100);
854 } catch (InterruptedException e) {
855 return Status.OK_STATUS;
856 }
857 }
858 monitor.done();
859 return Status.OK_STATUS;
860 }
861 };
862 job.schedule();
863
864 // fEventLog = openLogFile("TraceEvent.log");
865 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
866
867 ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
868 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
869
870 // long indexingStart = System.nanoTime();
871
872 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
873 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
874 long initialNbEvents = fNbEvents;
875
876 @Override
877 public void handleStarted() {
878 super.handleStarted();
879 }
880
881 @Override
882 public void handleData(ITmfEvent event) {
883 super.handleData(event);
884 if (event != null) {
885 ITmfTimestamp ts = event.getTimestamp();
886 if (startTime == null)
887 startTime = ts.clone();
888 lastTime = ts.clone();
889 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1) {
890 updateExperiment();
891 }
892 }
893 }
894
895 @Override
896 public void handleSuccess() {
897 // long indexingEnd = System.nanoTime();
898
899 // if the end time is a real value then it is the streaming safe time stamp
900 // set the last time to the safe time stamp to prevent unnecessary indexing requests
901 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH) {
902 lastTime = getRange().getEndTime();
903 }
904 updateExperiment();
905 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
906
907 // long average = (indexingEnd - indexingStart) / fNbEvents;
908 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
909 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
910 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
911 // + (average % 1000) + " us/evt)");
912 super.handleSuccess();
913 }
914
915 @Override
916 public void handleCompleted() {
917 job.cancel();
918 super.handleCompleted();
919 synchronized (fCheckpoints) {
920 fIndexing = false;
921 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
922 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
923 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
924 }
925 }
926 }
927
928 private void updateExperiment() {
929 int nbRead = getNbRead();
930 if (startTime != null) {
931 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
932 }
933 if (nbRead != 0) {
934 // updateTimeRange();
935 // updateNbEvents();
936 fNbEvents = initialNbEvents + nbRead;
937 notifyListeners();
938 }
939 }
940 };
941
942 sendRequest((ITmfDataRequest<T>) request);
943 if (waitForCompletion)
944 try {
945 request.waitForCompletion();
946 } catch (InterruptedException e) {
947 e.printStackTrace();
948 }
949 }
950
951 protected void notifyListeners() {
952 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
953 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
954 }
955
956 // ------------------------------------------------------------------------
957 // Signal handlers
958 // ------------------------------------------------------------------------
959
960 @TmfSignalHandler
961 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
962 TmfExperiment<?> experiment = signal.getExperiment();
963 if (experiment == this) {
964 setCurrentExperiment(experiment);
965 fEndSynchReference = Integer.valueOf(signal.getReference());
966 }
967 }
968
969 @TmfSignalHandler
970 public void endSync(TmfEndSynchSignal signal) {
971 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
972 fEndSynchReference = null;
973 initializeStreamingMonitor();
974 }
975
976 }
977
978 @TmfSignalHandler
979 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
980 }
981
982 @TmfSignalHandler
983 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
984 if (signal.getExperiment() == this) {
985 indexExperiment(false, (int) fNbEvents, signal.getRange());
986 }
987 }
988
989 @TmfSignalHandler
990 public void traceUpdated(TmfTraceUpdatedSignal signal) {
991 for (ITmfTrace<T> trace : fTraces) {
992 if (trace == signal.getTrace()) {
993 synchronized (fCheckpoints) {
994 if (fIndexing) {
995 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE) {
996 fIndexingPendingRange = signal.getRange();
997 } else {
998 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
999 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
1000 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
1001 startTime = signal.getRange().getStartTime();
1002 }
1003 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
1004 endTime = signal.getRange().getEndTime();
1005 }
1006 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
1007 }
1008 return;
1009 }
1010 }
1011 indexExperiment(false, (int) fNbEvents, signal.getRange());
1012 return;
1013 }
1014 }
1015 }
1016
1017 @Override
1018 public String getPath() {
1019 // TODO Auto-generated method stub
1020 return null;
1021 }
1022
1023 /**
1024 * Set the file to be used for bookmarks on this experiment
1025 * @param file the bookmarks file
1026 */
1027 public void setBookmarksFile(IFile file) {
1028 fBookmarksFile = file;
1029 }
1030
1031 /**
1032 * Get the file used for bookmarks on this experiment
1033 * @return the bookmarks file or null if none is set
1034 */
1035 public IFile getBookmarksFile() {
1036 return fBookmarksFile;
1037 }
1038
1039 /* (non-Javadoc)
1040 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#setResource(org.eclipse.core.resources.IResource)
1041 */
1042 @Override
1043 public void setResource(IResource resource) {
1044 fResource = resource;
1045 }
1046
1047 /* (non-Javadoc)
1048 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
1049 */
1050 @Override
1051 public IResource getResource() {
1052 return fResource;
1053 }
1054 }
This page took 0.051272 seconds and 4 git commands to generate.