Fix for bug 385419: Streaming issues with legacy LTTng traces.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / trace / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010, 2012 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Francois Chouinard - Updated as per TMF Trace Model 1.0
12 *******************************************************************************/
13
14 package org.eclipse.linuxtools.tmf.core.trace;
15
16 import org.eclipse.core.resources.IFile;
17 import org.eclipse.core.resources.IProject;
18 import org.eclipse.core.resources.IResource;
19 import org.eclipse.linuxtools.internal.tmf.core.trace.TmfExperimentContext;
20 import org.eclipse.linuxtools.internal.tmf.core.trace.TmfExperimentLocation;
21 import org.eclipse.linuxtools.internal.tmf.core.trace.TmfLocationArray;
22 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
23 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
24 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
25 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
26 import org.eclipse.linuxtools.tmf.core.exceptions.TmfTraceException;
27 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
28 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
29 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
31 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
32 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
33 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
36
37 /**
38 * TmfExperiment presents a time-ordered, unified view of a set of ITmfTrace:s
39 * that are part of a tracing experiment.
40 *
41 * @version 1.0
42 * @author Francois Chouinard
43 */
44 public class TmfExperiment<T extends ITmfEvent> extends TmfTrace<T> implements ITmfEventParser<T> {
45
46 // ------------------------------------------------------------------------
47 // Constants
48 // ------------------------------------------------------------------------
49
50 /**
51 * The default index page size
52 */
53 public static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
54
55 // ------------------------------------------------------------------------
56 // Attributes
57 // ------------------------------------------------------------------------
58
59 /**
60 * The currently selected experiment (null if none)
61 */
62 protected static TmfExperiment<?> fCurrentExperiment = null;
63
64 /**
65 * The set of traces that constitute the experiment
66 */
67 protected ITmfTrace<T>[] fTraces;
68
69 /**
70 * The set of traces that constitute the experiment
71 */
72 private boolean fInitialized = false;
73
74 /**
75 * The experiment bookmarks file
76 */
77 private IFile fBookmarksFile;
78
79
80 // Saved experiment context (optimization)
81 private TmfExperimentContext fExperimentContext;
82
83 // ------------------------------------------------------------------------
84 // Construction
85 // ------------------------------------------------------------------------
86
87 /**
88 * @param type
89 * @param id
90 * @param traces
91 * @throws TmfTraceException
92 */
93 public TmfExperiment(final Class<T> type, final String id, final ITmfTrace<T>[] traces) {
94 this(type, id, traces, DEFAULT_INDEX_PAGE_SIZE);
95 }
96
97 /**
98 * @param type
99 * @param id
100 * @param traces
101 * @param indexPageSize
102 * @throws TmfTraceException
103 */
104 @SuppressWarnings({ "unchecked", "rawtypes" })
105 public TmfExperiment(final Class<T> type, final String path, final ITmfTrace<T>[] traces, final int indexPageSize) {
106 setCacheSize(indexPageSize);
107 setStreamingInterval(0);
108 setIndexer(new TmfCheckpointIndexer(this, indexPageSize));
109 setParser(this);
110 try {
111 super.initialize(null, path, type);
112 } catch (TmfTraceException e) {
113 e.printStackTrace();
114 }
115
116 fTraces = traces;
117 setTimeRange(TmfTimeRange.NULL_RANGE);
118 }
119
120 /**
121 * Clears the experiment
122 */
123 @Override
124 @SuppressWarnings("rawtypes")
125 public synchronized void dispose() {
126
127 final TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
128 broadcast(signal);
129
130 if (fCurrentExperiment == this) {
131 fCurrentExperiment = null;
132 }
133
134 // Clean up the index if applicable
135 if (getIndexer() != null) {
136 getIndexer().dispose();
137 }
138
139 if (fTraces != null) {
140 for (final ITmfTrace trace : fTraces) {
141 trace.dispose();
142 }
143 fTraces = null;
144 }
145 super.dispose();
146 }
147
148 // ------------------------------------------------------------------------
149 // ITmfTrace - Initializers
150 // ------------------------------------------------------------------------
151
152 /* (non-Javadoc)
153 * @see org.eclipse.linuxtools.tmf.core.trace.TmfTrace#initTrace(org.eclipse.core.resources.IResource, java.lang.String, java.lang.Class)
154 */
155 @Override
156 public void initTrace(final IResource resource, final String path, final Class<T> type) {
157 }
158
159 /* (non-Javadoc)
160 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#validate(org.eclipse.core.resources.IProject, java.lang.String)
161 */
162 @Override
163 public boolean validate(final IProject project, final String path) {
164 return true;
165 }
166
167 // ------------------------------------------------------------------------
168 // Accessors
169 // ------------------------------------------------------------------------
170
171 /**
172 * Selects the current, framework-wide, experiment
173 *
174 * @param experiment das experiment
175 */
176 public static void setCurrentExperiment(final TmfExperiment<?> experiment) {
177 if (fCurrentExperiment != null && fCurrentExperiment != experiment) {
178 fCurrentExperiment.dispose();
179 }
180 fCurrentExperiment = experiment;
181 }
182
183 /**
184 * @return das experiment
185 */
186 public static TmfExperiment<?> getCurrentExperiment() {
187 return fCurrentExperiment;
188 }
189
190 /**
191 * Get the list of traces. Handle with care...
192 *
193 * @return the experiment traces
194 */
195 public ITmfTrace<T>[] getTraces() {
196 return fTraces;
197 }
198
199 /**
200 * Returns the timestamp of the event at the requested index. If none,
201 * returns null.
202 *
203 * @param index the event index (rank)
204 * @return the corresponding event timestamp
205 */
206 public ITmfTimestamp getTimestamp(final int index) {
207 final ITmfContext context = seekEvent(index);
208 final ITmfEvent event = getNext(context);
209 return (event != null) ? event.getTimestamp() : null;
210 }
211
212 /**
213 * Set the file to be used for bookmarks on this experiment
214 *
215 * @param file the bookmarks file
216 */
217 public void setBookmarksFile(final IFile file) {
218 fBookmarksFile = file;
219 }
220
221 /**
222 * Get the file used for bookmarks on this experiment
223 *
224 * @return the bookmarks file or null if none is set
225 */
226 public IFile getBookmarksFile() {
227 return fBookmarksFile;
228 }
229
230 // ------------------------------------------------------------------------
231 // Request management
232 // ------------------------------------------------------------------------
233
234 /* (non-Javadoc)
235 * @see org.eclipse.linuxtools.tmf.core.trace.TmfTrace#armRequest(org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest)
236 */
237 @Override
238 protected synchronized ITmfContext armRequest(final ITmfDataRequest<T> request) {
239
240 // Make sure we have something to read from
241 if (fTraces == null) {
242 return null;
243 }
244
245 if (request instanceof ITmfEventRequest<?>
246 && !TmfTimestamp.BIG_BANG.equals(((ITmfEventRequest<T>) request).getRange().getStartTime())
247 && request.getIndex() == 0)
248 {
249 final ITmfContext context = seekEvent(((ITmfEventRequest<T>) request).getRange().getStartTime());
250 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
251 return context;
252
253 }
254
255 // Check if we are already at the right index
256 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
257 return fExperimentContext;
258 }
259
260 return seekEvent(request.getIndex());
261 }
262
263 // ------------------------------------------------------------------------
264 // ITmfTrace trace positioning
265 // ------------------------------------------------------------------------
266
267 /* (non-Javadoc)
268 *
269 * Returns a brand new context based on the location provided and
270 * initializes the event queues
271 *
272 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.core.trace.ITmfLocation)
273 */
274 @Override
275 public synchronized ITmfContext seekEvent(final ITmfLocation<?> location) {
276 // Validate the location
277 if (location != null && !(location instanceof TmfExperimentLocation)) {
278 return null; // Throw an exception?
279 }
280 // Make sure we have something to read from
281 if (fTraces == null) {
282 return null;
283 }
284
285 // Instantiate the location
286 final TmfExperimentLocation expLocation = (location == null)
287 ? new TmfExperimentLocation(new TmfLocationArray(new ITmfLocation<?>[fTraces.length]))
288 : (TmfExperimentLocation) location.clone();
289
290 // Create and populate the context's traces contexts
291 final TmfExperimentContext context = new TmfExperimentContext(new ITmfContext[fTraces.length]);
292
293 for (int i = 0; i < fTraces.length; i++) {
294 // Get the relevant trace attributes
295 final ITmfLocation<?> trcLocation = expLocation.getLocation().getLocations()[i];
296 context.getContexts()[i] = fTraces[i].seekEvent(trcLocation);
297 expLocation.getLocation().getLocations()[i] = context.getContexts()[i].getLocation().clone();
298 context.getEvents()[i] = fTraces[i].getNext(context.getContexts()[i]);
299 }
300
301 // Finalize context
302 context.setLocation(expLocation);
303 context.setLastTrace(TmfExperimentContext.NO_TRACE);
304 context.setRank((location == null) ? 0 : ITmfContext.UNKNOWN_RANK);
305
306 fExperimentContext = context;
307 return context;
308 }
309
310 // ------------------------------------------------------------------------
311 // ITmfTrace - SeekEvent operations (returning a trace context)
312 // ------------------------------------------------------------------------
313
314 /* (non-Javadoc)
315 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#seekEvent(double)
316 */
317 @Override
318 public ITmfContext seekEvent(final double ratio) {
319 final ITmfContext context = seekEvent((long) (ratio * getNbEvents()));
320 return context;
321 }
322
323 /* (non-Javadoc)
324 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getLocationRatio(org.eclipse.linuxtools.tmf.core.trace.ITmfLocation)
325 */
326 @Override
327 public double getLocationRatio(final ITmfLocation<?> location) {
328 if (location instanceof TmfExperimentLocation) {
329 return (double) seekEvent(location).getRank() / getNbEvents();
330 }
331 return 0.0;
332 }
333
334 /* (non-Javadoc)
335 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getCurrentLocation()
336 */
337 @Override
338 public ITmfLocation<?> getCurrentLocation() {
339 ITmfLocation<?>[] locations = new ITmfLocation<?>[fTraces.length];
340 for (int i = 0; i < fTraces.length; i++) {
341 locations[i] = fTraces[i].getCurrentLocation();
342 }
343 return new TmfExperimentLocation(new TmfLocationArray(locations));
344 }
345
346 // ------------------------------------------------------------------------
347 // ITmfTrace trace positioning
348 // ------------------------------------------------------------------------
349
350 /* (non-Javadoc)
351 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfEventParser#parseEvent(org.eclipse.linuxtools.tmf.core.trace.ITmfContext)
352 */
353 @Override
354 public synchronized T parseEvent(final ITmfContext context) {
355 final ITmfContext savedContext = context.clone();
356 final T event = getNext(savedContext);
357 return event;
358 }
359
360 /* (non-Javadoc)
361 * @see org.eclipse.linuxtools.tmf.core.trace.TmfTrace#getNext(org.eclipse.linuxtools.tmf.core.trace.ITmfContext)
362 */
363 @Override
364 @SuppressWarnings("unchecked")
365 public synchronized T getNext(ITmfContext context) {
366
367 // Validate the context
368 if (!(context instanceof TmfExperimentContext)) {
369 return null; // Throw an exception?
370 }
371
372 // Make sure that we have something to read from
373 if (fTraces == null) {
374 return null;
375 }
376
377 TmfExperimentContext expContext = (TmfExperimentContext) context;
378
379 // If an event was consumed previously, first get the next one from that trace
380 final int lastTrace = expContext.getLastTrace();
381 if (lastTrace != TmfExperimentContext.NO_TRACE) {
382 final ITmfContext traceContext = expContext.getContexts()[lastTrace];
383 expContext.getEvents()[lastTrace] = fTraces[lastTrace].getNext(traceContext);
384 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
385 }
386
387 // Scan the candidate events and identify the "next" trace to read from
388 int trace = TmfExperimentContext.NO_TRACE;
389 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
390 for (int i = 0; i < fTraces.length; i++) {
391 final ITmfEvent event = expContext.getEvents()[i];
392 if (event != null && event.getTimestamp() != null) {
393 final ITmfTimestamp otherTS = event.getTimestamp();
394 if (otherTS.compareTo(timestamp, true) < 0) {
395 trace = i;
396 timestamp = otherTS;
397 }
398 }
399 }
400
401 T event = null;
402 if (trace != TmfExperimentContext.NO_TRACE) {
403 event = (T) expContext.getEvents()[trace];
404 if (event != null) {
405 updateAttributes(expContext, event.getTimestamp());
406 expContext.increaseRank();
407 expContext.setLastTrace(trace);
408 final ITmfContext traceContext = expContext.getContexts()[trace];
409
410 TmfExperimentLocation location = (TmfExperimentLocation) expContext.getLocation();
411 if (location != null) {
412 location.getLocation().getLocations()[trace] = traceContext.getLocation().clone();
413 }
414
415 fExperimentContext = expContext.clone();
416 processEvent(event);
417 }
418 }
419
420 return event;
421 }
422
423 /* (non-Javadoc)
424 * @see java.lang.Object#toString()
425 */
426 @Override
427 @SuppressWarnings("nls")
428 public synchronized String toString() {
429 return "[TmfExperiment (" + getName() + ")]";
430 }
431
432 // ------------------------------------------------------------------------
433 // Streaming support
434 // ------------------------------------------------------------------------
435
436 private synchronized void initializeStreamingMonitor() {
437
438 if (fInitialized) {
439 return;
440 }
441 fInitialized = true;
442
443 if (getStreamingInterval() == 0) {
444 final ITmfContext context = seekEvent(0);
445 final ITmfEvent event = getNext(context);
446 if (event == null) {
447 return;
448 }
449 final TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
450 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
451
452 // Broadcast in separate thread to prevent deadlock
453 new Thread() {
454 @Override
455 public void run() {
456 broadcast(signal);
457 }
458 }.start();
459 return;
460 }
461
462 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
463 private ITmfTimestamp safeTimestamp = null;
464 private ITmfTimestamp lastSafeTimestamp = null;
465 private TmfTimeRange timeRange = null;
466
467 @Override
468 public void run() {
469 while (!fExecutor.isShutdown()) {
470 if (!getIndexer().isIndexing()) {
471 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
472 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
473 for (final ITmfTrace<T> trace : fTraces) {
474 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
475 startTimestamp = trace.getStartTime();
476 }
477 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
478 endTimestamp = trace.getEndTime();
479 }
480 }
481 if (safeTimestamp != null && (lastSafeTimestamp == null || safeTimestamp.compareTo(lastSafeTimestamp, false) > 0)) {
482 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
483 lastSafeTimestamp = safeTimestamp;
484 } else {
485 timeRange = null;
486 }
487 safeTimestamp = endTimestamp;
488 if (timeRange != null) {
489 final TmfExperimentRangeUpdatedSignal signal =
490 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
491 broadcast(signal);
492 }
493 }
494 try {
495 Thread.sleep(getStreamingInterval());
496 } catch (final InterruptedException e) {
497 e.printStackTrace();
498 }
499 }
500 }
501 };
502 thread.start();
503 }
504
505 /* (non-Javadoc)
506 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
507 */
508 @Override
509 public long getStreamingInterval() {
510 long interval = 0;
511 for (final ITmfTrace<T> trace : fTraces) {
512 interval = Math.max(interval, trace.getStreamingInterval());
513 }
514 return interval;
515 }
516
517 // ------------------------------------------------------------------------
518 // Signal handlers
519 // ------------------------------------------------------------------------
520
521 private Integer fEndSynchReference;
522
523 /**
524 * Signal handler for the TmfExperimentSelectedSignal signal
525 *
526 * @param signal
527 */
528 @TmfSignalHandler
529 public void experimentSelected(final TmfExperimentSelectedSignal<T> signal) {
530 final TmfExperiment<?> experiment = signal.getExperiment();
531 if (experiment == this) {
532 setCurrentExperiment(experiment);
533 fEndSynchReference = Integer.valueOf(signal.getReference());
534 }
535 }
536
537 /**
538 * Signal handler for the TmfEndSynchSignal signal
539 *
540 * @param signal
541 */
542 @TmfSignalHandler
543 public void endSync(final TmfEndSynchSignal signal) {
544 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
545 fEndSynchReference = null;
546 initializeStreamingMonitor();
547 }
548 }
549
550 /**
551 * Signal handler for the TmfTraceUpdatedSignal signal
552 *
553 * @param signal
554 */
555 @TmfSignalHandler
556 public void traceUpdated(final TmfTraceUpdatedSignal signal) {
557 if (signal.getTrace() == this) {
558 broadcast(new TmfExperimentUpdatedSignal(this, this));
559 }
560 }
561
562 /**
563 * Signal handler for the TmfExperimentRangeUpdatedSignal signal
564 *
565 * @param signal
566 */
567 @TmfSignalHandler
568 public void experimentRangeUpdated(final TmfExperimentRangeUpdatedSignal signal) {
569 if (signal.getExperiment() == this) {
570 getIndexer().buildIndex(getNbEvents(), signal.getRange(), false);
571 }
572 }
573
574 }
This page took 0.045803 seconds and 5 git commands to generate.