[Bug 303523] LTTng/TMF udpates:
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.experiment;
14
15 import java.util.Vector;
16
17 import org.eclipse.core.runtime.IProgressMonitor;
18 import org.eclipse.core.runtime.IStatus;
19 import org.eclipse.core.runtime.Status;
20 import org.eclipse.core.runtime.jobs.Job;
21 import org.eclipse.linuxtools.tmf.component.ITmfContext;
22 import org.eclipse.linuxtools.tmf.component.TmfProvider;
23 import org.eclipse.linuxtools.tmf.event.TmfEvent;
24 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
25 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
26 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27 import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.trace.ITmfTrace;
30 import org.eclipse.linuxtools.tmf.trace.TmfTraceContext;
31 import org.eclipse.linuxtools.tmf.trace.TmfTraceUpdatedSignal;
32
33 /**
34 * <b><u>TmfExperiment</u></b>
35 * <p>
36 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces
37 * that are part of a tracing experiment.
38 * <p>
39 */
40 public class TmfExperiment<T extends TmfEvent> extends TmfProvider<T> {
41
42 // TODO: Complete multi-trace experiment
43 // TODO: Add support for dynamic addition/removal of traces
44 // TODO: Add support for live streaming (notifications, incremental indexing, ...)
45 // TODO: Implement indexing-on-demand
46
47 // ------------------------------------------------------------------------
48 // Attributes
49 // ------------------------------------------------------------------------
50
51 // The currently selected experiment
52 private static TmfExperiment<?> fCurrentExperiment;
53
54 // The experiment ID
55 private String fExperimentId;
56
57 // The set of trace sthat constitute the experiment
58 private Vector<ITmfTrace> fTraces;
59
60 // The total number of events
61 private int fNbEvents;
62
63 // The experiment time range
64 private TmfTimeRange fTimeRange;
65
66 // The experiment reference timestamp (default: BigBang)
67 private TmfTimestamp fEpoch;
68
69 // ------------------------------------------------------------------------
70 // Constructors
71 // ------------------------------------------------------------------------
72
73 /**
74 * @param type
75 * @param id
76 * @param traces
77 * @param epoch
78 * @param indexPageSize
79 */
80 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces, TmfTimestamp epoch, int indexPageSize) {
81 super(type);
82
83 fExperimentId = id;
84 fTraces = new Vector<ITmfTrace>();
85 for (ITmfTrace trace : traces) {
86 fTraces.add(trace);
87 }
88 fEpoch = epoch;
89 fIndexPageSize = indexPageSize;
90
91 updateNbEvents();
92 updateTimeRange();
93 }
94
95 /**
96 * @param type
97 * @param id
98 * @param traces
99 */
100 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces) {
101 this(type, id, traces, TmfTimestamp.Zero, DEFAULT_INDEX_PAGE_SIZE);
102 }
103
104 /**
105 * @param type
106 * @param id
107 * @param traces
108 * @param indexPageSize
109 */
110 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces, int indexPageSize) {
111 this(type, id, traces, TmfTimestamp.Zero, indexPageSize);
112 }
113
114 /**
115 *
116 */
117 @Override
118 public void deregister() {
119 fTraces.clear();
120 fCurrentExperiment= null;
121 super.deregister();
122 }
123
124 // ------------------------------------------------------------------------
125 // Accessors
126 // ------------------------------------------------------------------------
127
128 public static TmfExperiment<?> getCurrentExperiment() {
129 return fCurrentExperiment;
130 }
131
132 public String getExperimentId() {
133 return fExperimentId;
134 }
135
136 public ITmfTrace[] getTraces() {
137 ITmfTrace[] result = new ITmfTrace[fTraces.size()];
138 return fTraces.toArray(result);
139 }
140
141 public TmfTimestamp getEpoch() {
142 return fEpoch;
143 }
144
145 public TmfTimeRange getTimeRange() {
146 return fTimeRange;
147 }
148
149 public int getNbEvents() {
150 return fNbEvents;
151 }
152
153 /**
154 * Returns the rank of the first event with the requested timestamp.
155 * If none, returns the index of the next event (if any).
156 *
157 * @param ts
158 * @return
159 */
160 public long getRank(TmfTimestamp ts) {
161 // FIXME: Go over all the traces
162 ITmfTrace trace = fTraces.firstElement();
163 TmfTraceContext context = trace.seekEvent(ts);
164 return context.getRank();
165 }
166
167 /**
168 * Returns the timestamp of the event at the requested index.
169 * If none, returns null.
170 *
171 * @param index
172 * @return
173 */
174 public TmfTimestamp getTimestamp(int index) {
175 // FIXME: Go over all the traces
176 ITmfTrace trace = fTraces.firstElement();
177 TmfTraceContext context = trace.seekEvent(index);
178 TmfEvent event = trace.getNextEvent(context);
179 TmfTimestamp ts = (event != null) ? event.getTimestamp() : null;
180 return ts;
181 }
182
183 // ------------------------------------------------------------------------
184 // Operators
185 // ------------------------------------------------------------------------
186
187 /**
188 * Add a trace to the experiment trace set
189 *
190 * @param trace
191 */
192 public void addTrace(ITmfTrace trace) {
193 fTraces.add(trace);
194 synchronized(this) {
195 updateNbEvents();
196 updateTimeRange();
197 }
198 }
199
200 /**
201 * Update the total number of events
202 */
203 private void updateNbEvents() {
204 int nbEvents = 0;
205 for (ITmfTrace trace : fTraces) {
206 nbEvents += trace.getNbEvents();
207 }
208 fNbEvents = nbEvents;
209 }
210
211 /**
212 * Update the global time range
213 */
214 private void updateTimeRange() {
215 TmfTimestamp startTime = fTimeRange != null ? fTimeRange.getStartTime() : TmfTimestamp.BigCrunch;
216 TmfTimestamp endTime = fTimeRange != null ? fTimeRange.getEndTime() : TmfTimestamp.BigBang;
217
218 for (ITmfTrace trace : fTraces) {
219 TmfTimestamp traceStartTime = trace.getStartTime();
220 if (traceStartTime.compareTo(startTime, true) < 0)
221 startTime = traceStartTime;
222
223 TmfTimestamp traceEndTime = trace.getEndTime();
224 if (traceEndTime.compareTo(endTime, true) > 0)
225 endTime = traceEndTime;
226 }
227 fTimeRange = new TmfTimeRange(startTime, endTime);
228 }
229
230 // ------------------------------------------------------------------------
231 // TmfProvider
232 // ------------------------------------------------------------------------
233
234 @Override
235 public ITmfContext setContext(TmfDataRequest<T> request) {
236 TmfExperimentContext context = new TmfExperimentContext(fTraces);
237 positionTraces(request.getIndex(), context);
238 return context;
239 }
240
241 @SuppressWarnings("unchecked")
242 @Override
243 public T getNext(ITmfContext context) {
244 if (context instanceof TmfExperimentContext) {
245 return (T) getNextEvent((TmfExperimentContext) context);
246 }
247 return null;
248 }
249
250 @Override
251 public boolean isCompleted(TmfDataRequest<T> request, T data) {
252 if (request instanceof TmfEventRequest<?> && data != null) {
253 return data.getTimestamp().compareTo(((TmfEventRequest<T>) request).getRange().getEndTime(), false) > 0;
254 }
255 return false;
256 }
257
258 /**
259 * Given an experiment event index, position the set of traces so a call
260 * to getNextEvent() will retrieve the corresponding event.
261 *
262 * @param index
263 * @param context
264 */
265 private synchronized void positionTraces(long index, TmfExperimentContext context) {
266
267 // Extract the relevant information
268 ITmfTrace[] traces = context.getTraces();
269 TmfEvent[] events = context.getEvents();
270 TmfTraceContext[] contexts = context.getContexts();
271
272 int page = 0; // The checkpoint page
273 int current = 0; // The current event index (rank)
274
275 // If there is no checkpoint created yet, start from the beginning
276 if (fExperimentIndex.size() == 0) {
277 for (int i = 0; i < contexts.length; i++) {
278 contexts[i] = traces[i].seekLocation(null).clone();
279 events[i] = traces[i].parseEvent(contexts[i]);
280 }
281 }
282 else {
283 page = (int) index / fIndexPageSize;
284 if (page >= fExperimentIndex.size()) {
285 page = fExperimentIndex.size() - 1;
286 }
287
288 TmfTraceContext[] checkpoint = fExperimentIndex.elementAt(page).getContexts();
289 for (int i = 0; i < contexts.length; i++) {
290 contexts[i] = checkpoint[i].clone();
291 events[i] = traces[i].parseEvent(contexts[i]);
292 }
293 current = page * fIndexPageSize;
294 }
295
296 // Position the traces at the requested index
297 while (current++ < index) {
298 getNextEvent(context);
299 }
300 }
301
302 /**
303 * Scan the next events from all traces and return the next one
304 * in chronological order.
305 *
306 * @param context
307 * @return
308 */
309 private TmfEvent getNextEvent(TmfExperimentContext context) {
310 // TODO: Consider the time adjustment
311 int trace = 0;
312 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
313 if (context.getEvents()[trace] != null) {
314 timestamp = context.getEvents()[trace].getTimestamp();
315 }
316 for (int i = 1; i < context.getTraces().length; i++) {
317 if (context.getEvents()[i].getTimestamp() != null) {
318 TmfTimestamp otherTS = context.getEvents()[i].getTimestamp();
319 if (otherTS.compareTo(timestamp, true) < 0) {
320 trace = i;
321 timestamp = otherTS;
322 }
323 }
324 }
325 TmfEvent event = context.getTraces()[trace].getNextEvent(context.getContexts()[trace]);
326 context.getEvents()[trace] = context.getTraces()[trace].parseEvent(context.getContexts()[trace]);
327 return event;
328 }
329
330 /**
331 * Scan the next events from all traces and return the next one
332 * in chronological order.
333 *
334 * @param context
335 * @return
336 */
337 private TmfTimestamp getNextEventTimestamp(TmfExperimentContext context) {
338 // TODO: Consider the time adjustment
339 int trace = 0;
340 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
341 if (context.getEvents()[trace] != null) {
342 timestamp = context.getEvents()[trace].getTimestamp();
343 }
344 for (int i = 1; i < context.getTraces().length; i++) {
345 if (context.getEvents()[i].getTimestamp() != null) {
346 TmfTimestamp otherTS = context.getEvents()[i].getTimestamp();
347 if (otherTS.compareTo(timestamp, true) < 0) {
348 trace = i;
349 timestamp = otherTS;
350 }
351 }
352 }
353 return timestamp;
354 }
355
356 /* (non-Javadoc)
357 * @see java.lang.Object#toString()
358 */
359 @Override
360 public String toString() {
361 return "[TmfExperiment (" + fExperimentId + ")]";
362 }
363
364 // ------------------------------------------------------------------------
365 // Indexing
366 // ------------------------------------------------------------------------
367
368 /*
369 * The experiment holds the globally ordered events of its set of traces.
370 * It is expected to provide access to each individual event by index i.e.
371 * it must be possible to request the nth event of the experiment.
372 *
373 * The purpose of the index is to keep the information needed to rapidly
374 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
375 * event).
376 */
377
378 // The index page size
379 private static final int DEFAULT_INDEX_PAGE_SIZE = 1000;
380 private final int fIndexPageSize;
381
382 // The experiment index
383 private Vector<TmfExperimentCheckpoint> fExperimentIndex = new Vector<TmfExperimentCheckpoint>();
384
385 // Indicates that an indexing job is already running
386 private Boolean fIndexing = false;
387 private Boolean fIndexed = false;
388
389 // The indexing job
390 private IndexingJob job;
391
392 /**
393 * indexExperiment
394 *
395 * Creates the experiment index.
396 */
397 public void indexExperiment(boolean waitForCompletion) {
398
399 synchronized(fIndexing) {
400 if (fIndexed || fIndexing) {
401 // An indexing job is already running but a new request came
402 // in (probably due to a change in the trace set). The index
403 // being currently built is therefore already invalid.
404 // TODO: Cancel and restart the job
405 // TODO: Add support for dynamically adding/removing traces
406 return;
407 }
408 fIndexing = true;
409 }
410
411 job = new IndexingJob(fExperimentId);
412 job.schedule();
413
414 if (waitForCompletion) {
415 try {
416 job.join();
417 } catch (InterruptedException e) {
418 e.printStackTrace();
419 }
420 }
421 }
422
423 private class IndexingJob extends Job {
424
425 public IndexingJob(String name) {
426 super(name);
427 }
428
429 /* (non-Javadoc)
430 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
431 */
432 @Override
433 protected IStatus run(IProgressMonitor monitor) {
434
435 // Minimal check
436 if (fTraces.size() == 0) {
437 fIndexing = false;
438 return Status.OK_STATUS;
439 }
440
441 monitor.beginTask("Indexing " + fExperimentId, IProgressMonitor.UNKNOWN);
442
443 int nbEvents = 0;
444 TmfTimestamp startTime = null;
445 TmfTimestamp lastTime = null;
446
447 fExperimentIndex = new Vector<TmfExperimentCheckpoint>();
448
449 try {
450 // Reset the traces
451 TmfExperimentContext context = new TmfExperimentContext(fTraces);
452 positionTraces(0, context);
453 TmfTraceContext[] traces = context.cloneContexts();
454
455 TmfTimestamp timestamp = getNextEventTimestamp(context);
456 startTime = new TmfTimestamp(timestamp);
457 lastTime = new TmfTimestamp(timestamp);
458 TmfEvent event = getNextEvent(context);
459 while (event != null) {
460 if ((nbEvents++ % fIndexPageSize) == 0) {
461 fExperimentIndex.add(new TmfExperimentCheckpoint(lastTime, traces));
462 fNbEvents = nbEvents;
463 fTimeRange = new TmfTimeRange(startTime, lastTime);
464
465 monitor.worked(1);
466
467 // Check monitor *after* fCheckpoints has been updated
468 if (monitor.isCanceled()) {
469 monitor.done();
470 return Status.CANCEL_STATUS;
471 }
472 }
473
474 // We will need the contexts at the next iteration
475 if ((nbEvents % fIndexPageSize) == 0) {
476 traces = context.cloneContexts();
477 lastTime = new TmfTimestamp(event.getTimestamp());
478 }
479
480 event = getNextEvent(context);
481 }
482
483 }
484 finally {
485 synchronized(this) {
486 fNbEvents = nbEvents;
487 fTimeRange = new TmfTimeRange(startTime, lastTime);
488 fIndexing = false;
489 fIndexed = true;
490 }
491 monitor.done();
492 }
493
494 // dumpExperimentCheckpoints();
495
496 return Status.OK_STATUS;
497 }
498 }
499
500 // // ------------------------------------------------------------------------
501 // // Toubleshooting code
502 // // ------------------------------------------------------------------------
503 //
504 // private void dumpExperimentCheckpoints() {
505 // System.out.println("-----");
506 // System.out.println("Checkpoints of " + fExperimentId);
507 // for (int i = 0; i < fExperimentIndex.size(); i++) {
508 // System.out.println("Entry:" + i);
509 // TmfExperimentCheckpoint checkpoint = fExperimentIndex.get(i);
510 // TmfTraceContext[] contexts = checkpoint.getContexts();
511 // for (int j = 0; j < contexts.length; j++) {
512 // ITmfTrace trace = fTraces.get(j);
513 // TmfTraceContext context = trace.seekLocation(contexts[j].getLocation());
514 // TmfEvent event = fTraces.get(j).getNextEvent(new TmfTraceContext(context));
515 // System.out.println(" [" + trace.getName() + "] rank: " + context.getRank() + ", timestamp: " + event.getTimestamp());
516 // assert (checkpoint.getTimestamp().compareTo(event.getTimestamp(), false) == 0);
517 // }
518 // }
519 // }
520
521 // ------------------------------------------------------------------------
522 // Signal handlers
523 // ------------------------------------------------------------------------
524
525 @TmfSignalHandler
526 public void experimentSelected(TmfExperimentSelectedSignal signal) {
527 fCurrentExperiment = signal.getExperiment();
528 // if (signal.getExperiment() == this) {
529 // indexExperiment(true);
530 // }
531 }
532
533 @TmfSignalHandler
534 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
535 // indexExperiment(true);
536 }
537
538 @TmfSignalHandler
539 public void traceUpdated(TmfTraceUpdatedSignal signal) {
540 // TODO: Incremental index update
541 synchronized(this) {
542 updateNbEvents();
543 updateTimeRange();
544 }
545 broadcast(new TmfExperimentUpdatedSignal(this, this, signal.getTrace()));
546 }
547
548 }
This page took 0.044926 seconds and 5 git commands to generate.