Merge branch 'master' into lttng_2_0_control_dev
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalManager;
41 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
44 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
46 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
47
48 /**
49 * <b><u>TmfExperiment</u></b>
50 * <p>
51 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
52 * <p>
53 */
54 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
55
56 // ------------------------------------------------------------------------
57 // Attributes
58 // ------------------------------------------------------------------------
59
60 // The currently selected experiment
61 protected static TmfExperiment<?> fCurrentExperiment = null;
62
63 // The set of traces that constitute the experiment
64 protected ITmfTrace<T>[] fTraces;
65
66 // The total number of events
67 protected long fNbEvents;
68
69 // The experiment time range
70 protected TmfTimeRange fTimeRange;
71
72 // The experiment reference timestamp (default: Zero)
73 protected ITmfTimestamp fEpoch;
74
75 // The experiment index
76 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
77
78 // The current experiment context
79 protected TmfExperimentContext fExperimentContext;
80
81 // Flag to initialize only once
82 private boolean fInitialized = false;
83
84 // The experiment bookmarks file
85 private IFile fBookmarksFile;
86
87 // The properties resource
88 private IResource fResource;
89
90 // ------------------------------------------------------------------------
91 // Constructors
92 // ------------------------------------------------------------------------
93
94 @Override
95 public boolean validate(IProject project, String path) {
96 return true;
97 }
98
99 @Override
100 public void initTrace(String name, String path, Class<T> eventType) {
101 }
102
103 @Override
104 public void initTrace(String name, String path, Class<T> eventType, boolean indexTrace) {
105 if (indexTrace) {
106 initializeStreamingMonitor();
107 }
108 }
109
110 @Override
111 public void initTrace(String name, String path, Class<T> eventType, int cacheSize) {
112 }
113
114 @Override
115 public void initTrace(String name, String path, Class<T> eventType, int cacheSize, boolean indexTrace) {
116 if (indexTrace) {
117 initializeStreamingMonitor();
118 }
119 }
120
121 /**
122 * @param type
123 * @param id
124 * @param traces
125 * @param epoch
126 * @param indexPageSize
127 */
128 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize) {
129 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
130 }
131
132 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
133 super(id, type);
134
135 fTraces = traces;
136 fEpoch = epoch;
137 fIndexPageSize = indexPageSize;
138 fTimeRange = TmfTimeRange.NULL_RANGE;
139
140 if (preIndexExperiment) {
141 indexExperiment(true);
142 updateTimeRange();
143 }
144
145 }
146
147 protected TmfExperiment(String id, Class<T> type) {
148 super(id, type);
149 }
150
151 /**
152 * @param type
153 * @param id
154 * @param traces
155 */
156 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
157 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
158 }
159
160 /**
161 * @param type
162 * @param id
163 * @param traces
164 * @param indexPageSize
165 */
166 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
167 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
168 }
169
170 /**
171 * Copy constructor
172 *
173 * @param other
174 */
175 @SuppressWarnings("unchecked")
176 public TmfExperiment(TmfExperiment<T> other) {
177 super(other.getName() + "(clone)", other.fType); //$NON-NLS-1$
178
179 fEpoch = other.fEpoch;
180 fIndexPageSize = other.fIndexPageSize;
181
182 fTraces = new ITmfTrace[other.fTraces.length];
183 for (int trace = 0; trace < other.fTraces.length; trace++) {
184 fTraces[trace] = other.fTraces[trace].copy();
185 }
186
187 fNbEvents = other.fNbEvents;
188 fTimeRange = other.fTimeRange;
189 }
190
191 @Override
192 public TmfExperiment<T> copy() {
193 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
194 TmfSignalManager.deregister(experiment);
195 return experiment;
196 }
197
198 /**
199 * Clears the experiment
200 */
201 @Override
202 @SuppressWarnings("rawtypes")
203 public synchronized void dispose() {
204
205 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
206 broadcast(signal);
207 if (fCurrentExperiment == this) {
208 fCurrentExperiment = null;
209 }
210
211 if (fTraces != null) {
212 for (ITmfTrace trace : fTraces) {
213 trace.dispose();
214 }
215 fTraces = null;
216 }
217 if (fCheckpoints != null) {
218 fCheckpoints.clear();
219 }
220 super.dispose();
221 }
222
223 // ------------------------------------------------------------------------
224 // ITmfTrace
225 // ------------------------------------------------------------------------
226
227 @Override
228 public long getNbEvents() {
229 return fNbEvents;
230 }
231
232 @Override
233 public int getCacheSize() {
234 return fIndexPageSize;
235 }
236
237 @Override
238 public TmfTimeRange getTimeRange() {
239 return fTimeRange;
240 }
241
242 @Override
243 public ITmfTimestamp getStartTime() {
244 return fTimeRange.getStartTime();
245 }
246
247 @Override
248 public ITmfTimestamp getEndTime() {
249 return fTimeRange.getEndTime();
250 }
251
252 public Vector<TmfCheckpoint> getCheckpoints() {
253 return fCheckpoints;
254 }
255
256 // ------------------------------------------------------------------------
257 // Accessors
258 // ------------------------------------------------------------------------
259
260 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
261 if (fCurrentExperiment != null && fCurrentExperiment != experiment) {
262 fCurrentExperiment.dispose();
263 }
264 fCurrentExperiment = experiment;
265 }
266
267 public static TmfExperiment<?> getCurrentExperiment() {
268 return fCurrentExperiment;
269 }
270
271 public ITmfTimestamp getEpoch() {
272 return fEpoch;
273 }
274
275 public ITmfTrace<T>[] getTraces() {
276 return fTraces;
277 }
278
279 /**
280 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
281 * (if any).
282 *
283 * @param timestamp the event timestamp
284 * @return the corresponding event rank
285 */
286 @Override
287 public long getRank(ITmfTimestamp timestamp) {
288 TmfExperimentContext context = seekEvent(timestamp);
289 return context.getRank();
290 }
291
292 /**
293 * Returns the timestamp of the event at the requested index. If none, returns null.
294 *
295 * @param index the event index (rank)
296 * @return the corresponding event timestamp
297 */
298 public ITmfTimestamp getTimestamp(int index) {
299 TmfExperimentContext context = seekEvent(index);
300 ITmfEvent event = getNextEvent(context);
301 return (event != null) ? event.getTimestamp() : null;
302 }
303
304 // ------------------------------------------------------------------------
305 // Operators
306 // ------------------------------------------------------------------------
307
308 /**
309 * Update the global time range
310 */
311 protected void updateTimeRange() {
312 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
313 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
314
315 for (ITmfTrace<T> trace : fTraces) {
316 ITmfTimestamp traceStartTime = trace.getStartTime();
317 if (traceStartTime.compareTo(startTime, true) < 0)
318 startTime = traceStartTime;
319 ITmfTimestamp traceEndTime = trace.getEndTime();
320 if (traceEndTime.compareTo(endTime, true) > 0)
321 endTime = traceEndTime;
322 }
323 fTimeRange = new TmfTimeRange(startTime, endTime);
324 }
325
326 // ------------------------------------------------------------------------
327 // TmfProvider
328 // ------------------------------------------------------------------------
329 @Override
330 public ITmfContext armRequest(ITmfDataRequest<T> request) {
331 // Tracer.trace("Ctx: Arming request - start");
332 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
333 : null;
334
335 if (TmfTimestamp.BIG_BANG.equals(timestamp) || request.getIndex() > 0) {
336 timestamp = null; // use request index
337 }
338
339 TmfExperimentContext context = null;
340 if (timestamp != null) {
341 // seek by timestamp
342 context = seekEvent(timestamp);
343 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
344 } else {
345 // Seek by rank
346 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
347 // We are already at the right context -> no need to seek
348 context = fExperimentContext;
349 } else {
350 context = seekEvent(request.getIndex());
351 }
352 }
353 // Tracer.trace("Ctx: Arming request - done");
354 return context;
355 }
356
357 @SuppressWarnings("unchecked")
358 @Override
359 public T getNext(ITmfContext context) {
360 if (context instanceof TmfExperimentContext) {
361 return (T) getNextEvent((TmfExperimentContext) context);
362 }
363 return null;
364 }
365
366 // ------------------------------------------------------------------------
367 // ITmfTrace trace positioning
368 // ------------------------------------------------------------------------
369
370 // Returns a brand new context based on the location provided
371 // and initializes the event queues
372 @Override
373 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
374 // Validate the location
375 if (location != null && !(location instanceof TmfExperimentLocation)) {
376 return null; // Throw an exception?
377 }
378
379 if (fTraces == null) { // experiment has been disposed
380 return null;
381 }
382
383 // Instantiate the location
384 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
385 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
386
387 // Create and populate the context's traces contexts
388 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
389 // Tracer.trace("Ctx: SeekLocation - start");
390
391 long rank = 0;
392 for (int i = 0; i < fTraces.length; i++) {
393 // Get the relevant trace attributes
394 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
395 long traceRank = expLocation.getRanks()[i];
396
397 // Set the corresponding sub-context
398 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
399 context.getContexts()[i].setRank(traceRank);
400 rank += traceRank;
401
402 // Set the trace location and read the corresponding event
403 /* The (TmfContext) cast should be safe since we created 'context'
404 * ourselves higher up. */
405 expLocation.getLocation().locations[i] = ((TmfContext) context.getContexts()[i]).getLocation().clone();
406 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
407 }
408
409 // Tracer.trace("Ctx: SeekLocation - done");
410
411 // Finalize context
412 context.setLocation(expLocation);
413 context.setLastTrace(TmfExperimentContext.NO_TRACE);
414 context.setRank(rank);
415
416 fExperimentContext = context;
417
418 return context;
419 }
420
421 /*
422 * (non-Javadoc)
423 *
424 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
425 */
426 @Override
427 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
428
429 // Tracer.trace("Ctx: seekEvent(TS) - start");
430
431 if (timestamp == null) {
432 timestamp = TmfTimestamp.BIG_BANG;
433 }
434
435 // First, find the right checkpoint
436 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
437
438 // In the very likely case that the checkpoint was not found, bsearch
439 // returns its negated would-be location (not an offset...). From that
440 // index, we can then position the stream and get the event.
441 if (index < 0) {
442 index = Math.max(0, -(index + 2));
443 }
444
445 // Position the experiment at the checkpoint
446 ITmfLocation<?> location;
447 synchronized (fCheckpoints) {
448 if (fCheckpoints.size() > 0) {
449 if (index >= fCheckpoints.size()) {
450 index = fCheckpoints.size() - 1;
451 }
452 location = fCheckpoints.elementAt(index).getLocation();
453 } else {
454 location = null;
455 }
456 }
457
458 TmfExperimentContext context = seekLocation(location);
459 context.setRank((long) index * fIndexPageSize);
460
461 // And locate the event
462 ITmfEvent event = parseEvent(context);
463 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
464 getNextEvent(context);
465 event = parseEvent(context);
466 }
467
468 if (event == null) {
469 context.setLocation(null);
470 context.setRank(ITmfContext.UNKNOWN_RANK);
471 }
472
473 return context;
474 }
475
476 /*
477 * (non-Javadoc)
478 *
479 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
480 */
481 @Override
482 public synchronized TmfExperimentContext seekEvent(long rank) {
483
484 // Tracer.trace("Ctx: seekEvent(rank) - start");
485
486 // Position the stream at the previous checkpoint
487 int index = (int) rank / fIndexPageSize;
488 ITmfLocation<?> location;
489 synchronized (fCheckpoints) {
490 if (fCheckpoints.size() == 0) {
491 location = null;
492 } else {
493 if (index >= fCheckpoints.size()) {
494 index = fCheckpoints.size() - 1;
495 }
496 location = fCheckpoints.elementAt(index).getLocation();
497 }
498 }
499
500 TmfExperimentContext context = seekLocation(location);
501 context.setRank((long) index * fIndexPageSize);
502
503 // And locate the event
504 ITmfEvent event = parseEvent(context);
505 long pos = context.getRank();
506 while (event != null && pos++ < rank) {
507 getNextEvent(context);
508 event = parseEvent(context);
509 }
510
511 if (event == null) {
512 context.setLocation(null);
513 context.setRank(ITmfContext.UNKNOWN_RANK);
514 }
515
516 return context;
517 }
518
519 @Override
520 public TmfContext seekLocation(double ratio) {
521 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
522 return context;
523 }
524
525 @Override
526 public double getLocationRatio(ITmfLocation<?> location) {
527 if (location instanceof TmfExperimentLocation) {
528 return (double) seekLocation(location).getRank() / getNbEvents();
529 }
530 return 0;
531 }
532
533 @Override
534 public ITmfLocation<?> getCurrentLocation() {
535 if (fExperimentContext != null) {
536 return fExperimentContext.getLocation();
537 }
538 return null;
539 }
540
541 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
542
543 // TmfContext context0 = context.getContexts()[0];
544 // TmfEvent event0 = context.getEvents()[0];
545 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
546 // long rank0 = context.getRank();
547 // int trace = context.getLastTrace();
548 //
549 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
550 //
551 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
552 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
553 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
554 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
555 // Tracer.trace(result.toString());
556 // }
557
558 /**
559 * Scan the next events from all traces and return the next one in chronological order.
560 *
561 * @param context the trace context
562 * @return the next event
563 */
564 @SuppressWarnings("unchecked")
565 @Override
566 public synchronized ITmfEvent getNextEvent(ITmfContext context) {
567
568 // Validate the context
569 if (!(context instanceof TmfExperimentContext)) {
570 return null; // Throw an exception?
571 }
572
573 if (!context.equals(fExperimentContext)) {
574 // Tracer.trace("Ctx: Restoring context");
575 fExperimentContext = seekLocation(context.getLocation());
576 }
577
578 TmfExperimentContext expContext = (TmfExperimentContext) context;
579
580 // dumpContext(expContext, true);
581
582 // If an event was consumed previously, get the next one from that trace
583 int lastTrace = expContext.getLastTrace();
584 if (lastTrace != TmfExperimentContext.NO_TRACE) {
585 ITmfContext traceContext = expContext.getContexts()[lastTrace];
586 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
587 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
588 }
589
590 // Scan the candidate events and identify the "next" trace to read from
591 ITmfEvent eventArray[] = expContext.getEvents();
592 if (eventArray == null) {
593 return null;
594 }
595 int trace = TmfExperimentContext.NO_TRACE;
596 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
597 if (eventArray.length == 1) {
598 if (eventArray[0] != null) {
599 timestamp = eventArray[0].getTimestamp();
600 trace = 0;
601 }
602 } else {
603 for (int i = 0; i < eventArray.length; i++) {
604 ITmfEvent event = eventArray[i];
605 if (event != null && event.getTimestamp() != null) {
606 ITmfTimestamp otherTS = event.getTimestamp();
607 if (otherTS.compareTo(timestamp, true) < 0) {
608 trace = i;
609 timestamp = otherTS;
610 }
611 }
612 }
613 }
614 // Update the experiment context and set the "next" event
615 ITmfEvent event = null;
616 if (trace != TmfExperimentContext.NO_TRACE) {
617 updateIndex(expContext, timestamp);
618
619 ITmfContext traceContext = expContext.getContexts()[trace];
620 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
621 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
622 expLocation.getLocation().locations[trace] = (ITmfLocation<? extends Comparable<?>>) traceContext.getLocation().clone();
623
624 // updateIndex(expContext, timestamp);
625
626 expLocation.getRanks()[trace] = traceContext.getRank();
627 expContext.setLastTrace(trace);
628 expContext.updateRank(1);
629 event = expContext.getEvents()[trace];
630 fExperimentContext = expContext;
631 }
632
633 // if (event != null) {
634 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
635 // dumpContext(expContext, false);
636 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
637 // }
638
639 return event;
640 }
641
642 public synchronized void updateIndex(ITmfContext context, ITmfTimestamp timestamp) {
643 // Build the index as we go along
644 long rank = context.getRank();
645 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
646 // Determine the table position
647 long position = rank / fIndexPageSize;
648 // Add new entry at proper location (if empty)
649 if (fCheckpoints.size() == position) {
650 ITmfLocation<?> location = context.getLocation().clone();
651 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
652 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
653 // + location.toString());
654 }
655 }
656 }
657
658 /*
659 * (non-Javadoc)
660 *
661 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
662 */
663 @Override
664 public ITmfEvent parseEvent(ITmfContext context) {
665
666 // Validate the context
667 if (!(context instanceof TmfExperimentContext)) {
668 return null; // Throw an exception?
669 }
670
671 if (!context.equals(fExperimentContext)) {
672 // Tracer.trace("Ctx: Restoring context");
673 seekLocation(context.getLocation());
674 }
675
676 TmfExperimentContext expContext = (TmfExperimentContext) context;
677
678 // If an event was consumed previously, get the next one from that trace
679 int lastTrace = expContext.getLastTrace();
680 if (lastTrace != TmfExperimentContext.NO_TRACE) {
681 ITmfContext traceContext = expContext.getContexts()[lastTrace];
682 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
683 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
684 fExperimentContext = (TmfExperimentContext) context;
685 }
686
687 // Scan the candidate events and identify the "next" trace to read from
688 int trace = TmfExperimentContext.NO_TRACE;
689 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
690 for (int i = 0; i < expContext.getTraces().length; i++) {
691 ITmfEvent event = expContext.getEvents()[i];
692 if (event != null && event.getTimestamp() != null) {
693 ITmfTimestamp otherTS = event.getTimestamp();
694 if (otherTS.compareTo(timestamp, true) < 0) {
695 trace = i;
696 timestamp = otherTS;
697 }
698 }
699 }
700
701 ITmfEvent event = null;
702 if (trace != TmfExperimentContext.NO_TRACE) {
703 event = expContext.getEvents()[trace];
704 }
705
706 return event;
707 }
708
709 /*
710 * (non-Javadoc)
711 *
712 * @see java.lang.Object#toString()
713 */
714 @Override
715 @SuppressWarnings("nls")
716 public String toString() {
717 return "[TmfExperiment (" + getName() + ")]";
718 }
719
720 // ------------------------------------------------------------------------
721 // Indexing
722 // ------------------------------------------------------------------------
723
724 private synchronized void initializeStreamingMonitor() {
725 if (fInitialized) {
726 return;
727 }
728 fInitialized = true;
729
730 if (getStreamingInterval() == 0) {
731 TmfContext context = seekLocation(null);
732 ITmfEvent event = getNext(context);
733 if (event == null) {
734 return;
735 }
736 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
737 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
738
739 // Broadcast in separate thread to prevent deadlock
740 new Thread() {
741 @Override
742 public void run() {
743 broadcast(signal);
744 }
745 }.start();
746 return;
747 }
748
749 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
750 ITmfTimestamp safeTimestamp = null;
751 TmfTimeRange timeRange = null;
752
753 @Override
754 public void run() {
755 while (!fExecutor.isShutdown()) {
756 if (!isIndexingBusy()) {
757 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
758 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
759 for (ITmfTrace<T> trace : fTraces) {
760 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
761 startTimestamp = trace.getStartTime();
762 }
763 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
764 endTimestamp = trace.getEndTime();
765 }
766 }
767 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0) {
768 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
769 } else {
770 timeRange = null;
771 }
772 safeTimestamp = endTimestamp;
773 if (timeRange != null) {
774 TmfExperimentRangeUpdatedSignal signal =
775 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
776 broadcast(signal);
777 }
778 }
779 try {
780 Thread.sleep(getStreamingInterval());
781 } catch (InterruptedException e) {
782 e.printStackTrace();
783 }
784 }
785 }
786 };
787 thread.start();
788 }
789
790 /* (non-Javadoc)
791 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
792 */
793 @Override
794 public long getStreamingInterval() {
795 long interval = 0;
796 for (ITmfTrace<T> trace : fTraces) {
797 interval = Math.max(interval, trace.getStreamingInterval());
798 }
799 return interval;
800 }
801
802 /*
803 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
804 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
805 *
806 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
807 * intervals (every INDEX_PAGE_SIZE event).
808 */
809
810 // The index page size
811 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
812 protected int fIndexPageSize;
813 protected boolean fIndexing = false;
814 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
815
816 private Integer fEndSynchReference;
817
818 // private static BufferedWriter fEventLog = null;
819 // private static BufferedWriter openLogFile(String filename) {
820 // BufferedWriter outfile = null;
821 // try {
822 // outfile = new BufferedWriter(new FileWriter(filename));
823 // } catch (IOException e) {
824 // e.printStackTrace();
825 // }
826 // return outfile;
827 // }
828
829 protected boolean isIndexingBusy() {
830 synchronized (fCheckpoints) {
831 return fIndexing;
832 }
833 }
834
835 protected void indexExperiment(boolean waitForCompletion) {
836 indexExperiment(waitForCompletion, 0, TmfTimeRange.ETERNITY);
837 }
838
839 @SuppressWarnings("unchecked")
840 protected void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
841
842 synchronized (fCheckpoints) {
843 if (fIndexing) {
844 return;
845 }
846 fIndexing = true;
847 }
848
849 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
850 @Override
851 protected IStatus run(IProgressMonitor monitor) {
852 while (!monitor.isCanceled()) {
853 try {
854 Thread.sleep(100);
855 } catch (InterruptedException e) {
856 return Status.OK_STATUS;
857 }
858 }
859 monitor.done();
860 return Status.OK_STATUS;
861 }
862 };
863 job.schedule();
864
865 // fEventLog = openLogFile("TraceEvent.log");
866 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
867
868 ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
869 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
870
871 // long indexingStart = System.nanoTime();
872
873 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
874 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
875 long initialNbEvents = fNbEvents;
876
877 @Override
878 public void handleStarted() {
879 super.handleStarted();
880 }
881
882 @Override
883 public void handleData(ITmfEvent event) {
884 super.handleData(event);
885 if (event != null) {
886 ITmfTimestamp ts = event.getTimestamp();
887 if (startTime == null)
888 startTime = ts.clone();
889 lastTime = ts.clone();
890 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1) {
891 updateExperiment();
892 }
893 }
894 }
895
896 @Override
897 public void handleSuccess() {
898 // long indexingEnd = System.nanoTime();
899
900 // if the end time is a real value then it is the streaming safe time stamp
901 // set the last time to the safe time stamp to prevent unnecessary indexing requests
902 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH) {
903 lastTime = getRange().getEndTime();
904 }
905 updateExperiment();
906 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
907
908 // long average = (indexingEnd - indexingStart) / fNbEvents;
909 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
910 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
911 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
912 // + (average % 1000) + " us/evt)");
913 super.handleSuccess();
914 }
915
916 @Override
917 public void handleCompleted() {
918 job.cancel();
919 super.handleCompleted();
920 synchronized (fCheckpoints) {
921 fIndexing = false;
922 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
923 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
924 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
925 }
926 }
927 }
928
929 private void updateExperiment() {
930 int nbRead = getNbRead();
931 if (startTime != null) {
932 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
933 }
934 if (nbRead != 0) {
935 // updateTimeRange();
936 // updateNbEvents();
937 fNbEvents = initialNbEvents + nbRead;
938 notifyListeners();
939 }
940 }
941 };
942
943 sendRequest((ITmfDataRequest<T>) request);
944 if (waitForCompletion)
945 try {
946 request.waitForCompletion();
947 } catch (InterruptedException e) {
948 e.printStackTrace();
949 }
950 }
951
952 protected void notifyListeners() {
953 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
954 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
955 }
956
957 // ------------------------------------------------------------------------
958 // Signal handlers
959 // ------------------------------------------------------------------------
960
961 @TmfSignalHandler
962 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
963 TmfExperiment<?> experiment = signal.getExperiment();
964 if (experiment == this) {
965 setCurrentExperiment(experiment);
966 fEndSynchReference = Integer.valueOf(signal.getReference());
967 }
968 }
969
970 @TmfSignalHandler
971 public void endSync(TmfEndSynchSignal signal) {
972 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
973 fEndSynchReference = null;
974 initializeStreamingMonitor();
975 }
976
977 }
978
979 @TmfSignalHandler
980 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
981 }
982
983 @TmfSignalHandler
984 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
985 if (signal.getExperiment() == this) {
986 indexExperiment(false, (int) fNbEvents, signal.getRange());
987 }
988 }
989
990 @TmfSignalHandler
991 public void traceUpdated(TmfTraceUpdatedSignal signal) {
992 for (ITmfTrace<T> trace : fTraces) {
993 if (trace == signal.getTrace()) {
994 synchronized (fCheckpoints) {
995 if (fIndexing) {
996 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE) {
997 fIndexingPendingRange = signal.getRange();
998 } else {
999 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
1000 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
1001 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
1002 startTime = signal.getRange().getStartTime();
1003 }
1004 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
1005 endTime = signal.getRange().getEndTime();
1006 }
1007 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
1008 }
1009 return;
1010 }
1011 }
1012 indexExperiment(false, (int) fNbEvents, signal.getRange());
1013 return;
1014 }
1015 }
1016 }
1017
1018 @Override
1019 public String getPath() {
1020 // TODO Auto-generated method stub
1021 return null;
1022 }
1023
1024 /**
1025 * Set the file to be used for bookmarks on this experiment
1026 * @param file the bookmarks file
1027 */
1028 public void setBookmarksFile(IFile file) {
1029 fBookmarksFile = file;
1030 }
1031
1032 /**
1033 * Get the file used for bookmarks on this experiment
1034 * @return the bookmarks file or null if none is set
1035 */
1036 public IFile getBookmarksFile() {
1037 return fBookmarksFile;
1038 }
1039
1040 /* (non-Javadoc)
1041 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#setResource(org.eclipse.core.resources.IResource)
1042 */
1043 @Override
1044 public void setResource(IResource resource) {
1045 fResource = resource;
1046 }
1047
1048 /* (non-Javadoc)
1049 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
1050 */
1051 @Override
1052 public IResource getResource() {
1053 return fResource;
1054 }
1055 }
This page took 0.055939 seconds and 6 git commands to generate.