- Tweaked the FW a little to accommodate LTTng indexing
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / trace / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.trace;
14
15 import java.lang.reflect.InvocationTargetException;
16 import java.util.Vector;
17
18 import org.eclipse.core.runtime.IProgressMonitor;
19 import org.eclipse.core.runtime.IStatus;
20 import org.eclipse.core.runtime.Status;
21 import org.eclipse.core.runtime.jobs.Job;
22 import org.eclipse.jface.dialogs.ProgressMonitorDialog;
23 import org.eclipse.jface.operation.IRunnableWithProgress;
24 import org.eclipse.linuxtools.tmf.component.TmfComponent;
25 import org.eclipse.linuxtools.tmf.event.TmfEvent;
26 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
27 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
28 import org.eclipse.linuxtools.tmf.request.ITmfRequestHandler;
29 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
30 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
31
32 /**
33 * <b><u>TmfExperiment</u></b>
34 * <p>
35 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces
36 * that are part of a tracing experiment.
37 * <p>
38 */
39 public class TmfExperiment extends TmfComponent implements ITmfRequestHandler<TmfEvent> {
40
41 // TODO: Complete multi-trace experiment
42 // TODO: Add support for dynamic addition/removal of traces
43 // TODO: Add support for live streaming (notifications, incremental indexing, ...)
44
45 // ========================================================================
46 // Attributes
47 // ========================================================================
48
49 // The currently selected experiment
50 private static TmfExperiment fCurrentExperiment;
51
52 // The experiment ID
53 private String fExperimentId;
54
55 // The set of trace sthat constitute the experiment
56 private Vector<ITmfTrace> fTraces;
57
58 // The total number of events
59 private int fNbEvents;
60
61 // The experiment time range
62 private TmfTimeRange fTimeRange;
63
64 // The experiment reference timestamp (default: BigBang)
65 private TmfTimestamp fEpoch;
66
67 // Indicates if the stream should be indexed synchronously (default: false)
68 private final boolean fWaitForIndexCompletion;
69
70 // ========================================================================
71 // Constructors/Destructor
72 // ========================================================================
73
74 /**
75 * @param id
76 * @param traces
77 */
78 public TmfExperiment(String id, ITmfTrace[] traces) {
79 this(id, traces, TmfTimestamp.BigBang, DEFAULT_INDEX_PAGE_SIZE, false);
80 }
81
82 /**
83 * @param id
84 * @param traces
85 * @param waitForIndexCompletion
86 */
87 public TmfExperiment(String id, ITmfTrace[] traces, boolean waitForIndexCompletion) {
88 this(id, traces, TmfTimestamp.BigBang, DEFAULT_INDEX_PAGE_SIZE, waitForIndexCompletion);
89 }
90
91 /**
92 * @param id
93 * @param traces
94 * @param epoch
95 * @param waitForIndexCompletion
96 */
97 public TmfExperiment(String id, ITmfTrace[] traces, TmfTimestamp epoch, int indexPageSize, boolean waitForIndexCompletion) {
98 super();
99
100 fExperimentId = id;
101 fTraces = new Vector<ITmfTrace>();
102 for (ITmfTrace trace : traces) {
103 fTraces.add(trace);
104 }
105 fEpoch = epoch;
106 fIndexPageSize = indexPageSize;
107 fWaitForIndexCompletion = waitForIndexCompletion;
108
109 updateNbEvents();
110 updateTimeRange();
111 indexExperiment();
112 }
113
114 /**
115 *
116 */
117 @Override
118 public void dispose() {
119 super.dispose();
120 fTraces.clear();
121 fCurrentExperiment= null;
122 }
123
124 // ========================================================================
125 // Accessors
126 // ========================================================================
127
128 public static TmfExperiment getCurrentExperiment() {
129 return fCurrentExperiment;
130 }
131
132 public String getExperimentId() {
133 return fExperimentId;
134 }
135
136 public ITmfTrace[] getTraces() {
137 ITmfTrace[] result = new ITmfTrace[fTraces.size()];
138 return fTraces.toArray(result);
139 }
140
141 public TmfTimestamp getEpoch() {
142 return fEpoch;
143 }
144
145 public TmfTimeRange getTimeRange() {
146 return fTimeRange;
147 }
148
149 public int getNbEvents() {
150 return fNbEvents;
151 }
152
153 /**
154 * Returns the index of the first event with the requested timestamp.
155 * If none, returns the index of the next event (if any).
156 *
157 * @param ts
158 * @return
159 */
160 public long getIndex(TmfTimestamp ts) {
161 // TODO: Go over all the traces
162 ITmfTrace trace = fTraces.firstElement();
163 TmfTraceContext context = trace.seekEvent(ts);
164 return context.getIndex();
165 }
166
167 /**
168 * Returns the timestamp of the event at the requested index.
169 * If none, returns null.
170 *
171 * @param index
172 * @return
173 */
174 public TmfTimestamp getTimestamp(int index) {
175 // TODO: Go over all the traces
176 ITmfTrace trace = fTraces.firstElement();
177 TmfTraceContext context = trace.seekEvent(index);
178 return context.getTimestamp();
179 }
180
181 // ========================================================================
182 // Operators
183 // ========================================================================
184
185 /**
186 * Add a trace to the experiment trace set
187 *
188 * @param trace
189 */
190 public void addTrace(ITmfTrace trace) {
191 fTraces.add(trace);
192 synchronized(this) {
193 updateNbEvents();
194 updateTimeRange();
195 }
196 }
197
198 /**
199 * Update the total number of events
200 */
201 private void updateNbEvents() {
202 int nbEvents = 0;
203 for (ITmfTrace trace : fTraces) {
204 nbEvents += trace.getNbEvents();
205 }
206 fNbEvents = nbEvents;
207 }
208
209 /**
210 * Update the global time range
211 */
212 private void updateTimeRange() {
213 TmfTimestamp startTime = fTimeRange != null ? fTimeRange.getStartTime() : TmfTimestamp.BigCrunch;
214 TmfTimestamp endTime = fTimeRange != null ? fTimeRange.getEndTime() : TmfTimestamp.BigBang;
215
216 for (ITmfTrace trace : fTraces) {
217 TmfTimestamp traceStartTime = trace.getTimeRange().getStartTime();
218 if (traceStartTime.compareTo(startTime, true) < 0)
219 startTime = traceStartTime;
220
221 TmfTimestamp traceEndTime = trace.getTimeRange().getEndTime();
222 if (traceEndTime.compareTo(endTime, true) > 0)
223 endTime = traceEndTime;
224 }
225 fTimeRange = new TmfTimeRange(startTime, endTime);
226 }
227
228 // ========================================================================
229 // ITmfRequestHandler
230 // ========================================================================
231
232 /* (non-Javadoc)
233 * @see org.eclipse.linuxtools.tmf.request.ITmfRequestHandler#processRequest(org.eclipse.linuxtools.tmf.request.TmfDataRequest, boolean)
234 */
235 public void processRequest(TmfDataRequest<TmfEvent> request, boolean waitForCompletion) {
236
237 // Process the request
238 processDataRequest(request);
239
240 // Wait for completion if needed
241 if (waitForCompletion) {
242 request.waitForCompletion();
243 }
244 }
245
246 /**
247 * Process a data request
248 *
249 * @param request
250 */
251 private void processDataRequest(final TmfDataRequest<TmfEvent> request) {
252
253 // General request parameters
254 final TmfTimestamp endTime;
255 final long index;
256
257 // Initialize request params depending on request type
258 if (request.getRange() != null) {
259 index = getIndex(request.getRange().getStartTime());
260 endTime = request.getRange().getEndTime();
261 } else {
262 index = request.getIndex();
263 endTime = TmfTimestamp.BigCrunch;
264 }
265
266 // Process the request
267 Thread thread = new Thread() {
268
269 @Override
270 public void run() {
271
272 // Key variables
273 ITmfTrace[] traces = new ITmfTrace[0]; // The set of traces
274 TmfTraceContext[] contexts; // The set of trace contexts
275
276 // Extract the general request information
277 int blockSize = request.getBlockize();
278 int nbRequestedEvents = request.getNbRequestedItems();
279 if (nbRequestedEvents == -1) {
280 nbRequestedEvents = Integer.MAX_VALUE;
281 }
282
283 // Create the result buffer
284 Vector<TmfEvent> events = new Vector<TmfEvent>();
285 int nbEvents = 0;
286
287 // Initialize the traces array and position the traces
288 // at the first requested event
289 traces = fTraces.toArray(traces);
290 contexts = new TmfTraceContext[traces.length];
291 positionTraces(index, traces, contexts);
292
293 // Get the ordered events
294 TmfEvent event = getNextEvent(traces, contexts);
295 while (!request.isCancelled() && nbEvents < nbRequestedEvents && event != null
296 && event.getTimestamp().compareTo(endTime, false) < 0)
297 {
298 events.add(event);
299 if (++nbEvents % blockSize == 0) {
300 pushData(request, events);
301 }
302 // Avoid an unnecessary read passed the last event requested
303 if (nbEvents < nbRequestedEvents)
304 event = getNextEvent(traces, contexts);
305 }
306 pushData(request, events);
307 request.done();
308 }
309 };
310 thread.start();
311 }
312
313 /**
314 * Given an experiment event index, position the set of traces so a call
315 * to getNextEvent() will retrieve the corresponding event.
316 *
317 * @param index
318 * @param traces
319 * @param contexts
320 * @param nextEvents
321 */
322 private synchronized void positionTraces(long index, ITmfTrace[] traces, TmfTraceContext[] contexts) {
323
324 // Compute the index page and corresponding index
325 int page = (int) index / fIndexPageSize;
326 int current = page * fIndexPageSize;
327
328 // Retrieve the checkpoint and set the contexts (make copies)
329 TmfTraceContext[] saveContexts = new TmfTraceContext[contexts.length];
330 if (page < fExperimentIndex.size()) {
331 saveContexts = fExperimentIndex.elementAt(page);
332 for (int i = 0; i < contexts.length; i++) {
333 contexts[i] = new TmfTraceContext(saveContexts[i]);
334 }
335 } else {
336 // If the page entry doesn't exist (e.g. indexing not completed),
337 // set contexts at the the last entry (if it exists)
338 page = fExperimentIndex.size() - 1;
339 if (page >= 0) {
340 saveContexts = fExperimentIndex.elementAt(page);
341 for (int i = 0; i < contexts.length; i++) {
342 contexts[i] = new TmfTraceContext(saveContexts[i]);
343 }
344 current = page * fIndexPageSize;
345 }
346 // Index is empty... position traces at their beginning
347 else {
348 for (int i = 0; i < contexts.length; i++) {
349 contexts[i] = new TmfTraceContext(traces[i].seekLocation(null));
350 }
351 current = 0;
352 }
353 }
354
355 // Position the traces at the requested index
356 while (current++ < index) {
357 getNextEvent(traces, contexts);
358 }
359 }
360
361 /**
362 * Scan the next events from all traces and return the next one
363 * in chronological order.
364 *
365 * @param traces
366 * @param contexts
367 * @param nextEvents
368 * @return
369 */
370 private TmfEvent getNextEvent(ITmfTrace[] traces, TmfTraceContext[] contexts) {
371 // TODO: Consider the time adjustment
372 int trace = 0;
373 TmfTimestamp timestamp = contexts[trace].getTimestamp();
374 if (timestamp == null) {
375 timestamp = TmfTimestamp.BigCrunch;
376 }
377 for (int i = 1; i < traces.length; i++) {
378 if (contexts[i].getTimestamp() != null) {
379 TmfTimestamp otherTS = contexts[i].getTimestamp();
380 if (otherTS.compareTo(timestamp, true) < 0) {
381 trace = i;
382 timestamp = otherTS;
383 }
384 }
385 }
386 TmfEvent event = traces[trace].getNextEvent(contexts[trace]);
387 return event;
388 }
389
390 /**
391 * Format the result data and notify the requester.
392 * Note: after handling, the data is *removed*.
393 *
394 * @param request
395 * @param events
396 */
397 private void pushData(TmfDataRequest<TmfEvent> request, Vector<TmfEvent> events) {
398 TmfEvent[] result = new TmfEvent[events.size()];
399 events.toArray(result);
400 request.setData(result);
401 request.handleData();
402 events.removeAllElements();
403 }
404
405 /* (non-Javadoc)
406 * @see java.lang.Object#toString()
407 */
408 @Override
409 public String toString() {
410 return "[TmfExperiment (" + fExperimentId + ")]";
411 }
412
413 // ========================================================================
414 // Indexing
415 // ========================================================================
416
417 /*
418 * The experiment holds the globally ordered events of its set of traces.
419 * It is expected to provide access to each individual event by index i.e.
420 * it must be possible to request the Nth event of the experiment.
421 *
422 * The purpose of the index is to keep the information needed to rapidly
423 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
424 * event).
425 */
426
427 // The index page size
428 private static final int DEFAULT_INDEX_PAGE_SIZE = 1000;
429 private final int fIndexPageSize;
430
431 // The experiment index
432 private Vector<TmfTraceContext[]> fExperimentIndex = new Vector<TmfTraceContext[]>();
433
434 // Indicates that an indexing job is already running
435 private Boolean fIndexing = false;
436
437 // The indexing job
438 private IndexingJob job;
439
440 /**
441 * indexExperiment
442 *
443 * Creates the experiment index.
444 */
445 private void indexExperiment() {
446
447 synchronized(fIndexing) {
448 if (fIndexing) {
449 // An indexing job is already running but a new request came
450 // in (probably due to a change in the trace set). The index
451 // being currently built is therefore already invalid.
452 // TODO: Cancel and restart the job
453 // TODO: Add support for dynamically adding/removing traces
454 return;
455 }
456 fIndexing = true;
457 }
458
459 job = new IndexingJob(fExperimentId);
460 job.schedule();
461 if (fWaitForIndexCompletion) {
462 ProgressMonitorDialog dialog = new ProgressMonitorDialog(null);
463 try {
464 // TODO: Handle cancel!
465 dialog.run(true, true, new IRunnableWithProgress() {
466 public void run(IProgressMonitor monitor) throws InvocationTargetException, InterruptedException {
467 monitor.beginTask("Indexing " + fExperimentId, IProgressMonitor.UNKNOWN);
468 job.join();
469 monitor.done();
470 }
471 });
472 } catch (InvocationTargetException e) {
473 e.printStackTrace();
474 } catch (InterruptedException e) {
475 e.printStackTrace();
476 }
477 }
478 }
479
480 private class IndexingJob extends Job {
481
482 public IndexingJob(String name) {
483 super(name);
484 }
485
486 /* (non-Javadoc)
487 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
488 */
489 @Override
490 protected IStatus run(IProgressMonitor monitor) {
491
492 Vector<TmfTraceContext[]> indices = new Vector<TmfTraceContext[]>();
493
494 // Minimal check
495 if (fTraces.size() == 0) {
496 fIndexing = false;
497 return Status.OK_STATUS;
498 }
499
500 monitor.beginTask("Indexing " + fExperimentId, IProgressMonitor.UNKNOWN);
501
502 ITmfTrace[] traces = new ITmfTrace[0];
503 TmfTraceContext[] contexts;
504 int nbTraces = fTraces.size();
505
506 // Initialize the traces and contexts arrays
507 traces = fTraces.toArray(traces);
508 contexts = new TmfTraceContext[nbTraces];
509 TmfTraceContext[] savedContexts = new TmfTraceContext[nbTraces];
510 int nullEvents = 0;
511 for (int i = 0; i < nbTraces; i++) {
512 // Context of the first event of the trace
513 contexts[i] = traces[i].seekLocation(null);
514 savedContexts[i] = new TmfTraceContext(contexts[i].getLocation(), contexts[i].getTimestamp(), 0);
515 if (contexts[i].getTimestamp() == null)
516 nullEvents++;
517 }
518 // Check if there is anything to index
519 if (nullEvents >= nbTraces) {
520 fIndexing = false;
521 return Status.OK_STATUS;
522 }
523 // FIXME: LTTng hack - start
524 // indices.add(savedContexts); // TMF
525 // FIXME: LTTng hack - end
526
527 // Get the ordered events and populate the indices
528 // FIXME: LTTng hack - start
529 // int nbEvents = 0; // TMF
530 int nbEvents = -1; // LTTng
531 // FIXME: LTTng hack - end
532 while ((getNextEvent(traces, contexts)) != null)
533 {
534 if (++nbEvents % fIndexPageSize == 0) {
535 // Special case: if the total number of events is a multiple of the
536 // DEFAULT_PAGE_SIZE then all the pending events are null. In that
537 // case, we don't store an additional entry in the index array.
538 nullEvents = 0;
539 savedContexts = new TmfTraceContext[nbTraces];
540 for (int i = 0; i < nbTraces; i++) {
541 savedContexts[i] = new TmfTraceContext(contexts[i]);
542 if (contexts[i].getTimestamp() == null)
543 nullEvents++;
544 }
545 if (nullEvents < nbTraces) {
546 indices.add(savedContexts);
547 }
548 }
549
550 monitor.worked(1);
551 if (monitor.isCanceled()) {
552 monitor.done();
553 return Status.CANCEL_STATUS;
554 }
555 }
556
557 monitor.done();
558 fExperimentIndex = indices;
559
560 // dumpIndex();
561
562 fIndexing = false;
563 return Status.OK_STATUS;
564 }
565 }
566
567 // /**
568 // * Dump the experiment index
569 // */
570 // private void dumpIndex() {
571 // System.out.println("-----");
572 // System.out.println("Index of " + fExperimentId);
573 // for (int i = 0; i < fExperimentIndex.size(); i++) {
574 // System.out.println("Entry:" + i);
575 // TmfTraceContext[] contexts = fExperimentIndex.get(i);
576 // int nbEvents = 0;
577 // for (int j = 0; j < contexts.length; j++) {
578 // ITmfTrace trace = fTraces.get(j);
579 // TmfTraceContext context = trace.seekLocation(contexts[j].getLocation());
580 // TmfEvent event = fTraces.get(j).getNextEvent(new TmfTraceContext(context));
581 // nbEvents += contexts[j].getIndex();
582 // System.out.println(" [" + trace.getName() + "]"
583 // + " index: " + contexts[j].getIndex()
584 // + ", timestamp: " + contexts[j].getTimestamp()
585 // + ", event: " + event.getTimestamp());
586 // assert (contexts[j].getTimestamp().compareTo(event.getTimestamp(), false) == 0);
587 // }
588 // assert ((i+1) * fIndexPageSize == nbEvents);
589 //
590 // }
591 // }
592
593 // ========================================================================
594 // Signal handlers
595 // ========================================================================
596
597 @TmfSignalHandler
598 public void experimentSelected(TmfExperimentSelectedSignal signal) {
599 fCurrentExperiment = this;
600 indexExperiment();
601 }
602
603 @TmfSignalHandler
604 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
605 // indexExperiment();
606 }
607
608 @TmfSignalHandler
609 public void traceUpdated(TmfTraceUpdatedSignal signal) {
610 // TODO: Incremental index update
611 synchronized(this) {
612 updateNbEvents();
613 updateTimeRange();
614 }
615 broadcastSignal(new TmfExperimentUpdatedSignal(this, this, signal.getTrace()));
616 }
617 }
This page took 0.044684 seconds and 5 git commands to generate.