[Bug292967] Second part of request coalescing + unit tests + minor fixes.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / trace / TmfTrace.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.trace;
14
15 import java.io.File;
16 import java.io.FileNotFoundException;
17 import java.util.Collections;
18 import java.util.Vector;
19
20 import org.eclipse.core.runtime.IProgressMonitor;
21 import org.eclipse.core.runtime.IStatus;
22 import org.eclipse.core.runtime.Status;
23 import org.eclipse.core.runtime.jobs.Job;
24 import org.eclipse.linuxtools.tmf.component.TmfEventProvider;
25 import org.eclipse.linuxtools.tmf.event.TmfEvent;
26 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
27 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
28 import org.eclipse.linuxtools.tmf.request.TmfCoalescedEventRequest;
29 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
30 import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
31
32 /**
33 * <b><u>TmfTrace</u></b>
34 * <p>
35 * Abstract implementation of ITmfTrace. It should be sufficient to extend this
36 * class and provide implementation for <code>getCurrentLocation()</code> and
37 * <code>seekLocation()</code>, as well as a proper parser, to have a working
38 * concrete implementation.
39 *
40 * TODO: Add support for live streaming (notifications, incremental indexing, ...)
41 */
42 public abstract class TmfTrace<T extends TmfEvent> extends TmfEventProvider<T> implements ITmfTrace {
43
44 // ------------------------------------------------------------------------
45 // Constants
46 // ------------------------------------------------------------------------
47
48 // The default number of events to cache
49 // TODO: Make the DEFAULT_CACHE_SIZE a preference
50 public static final int DEFAULT_CACHE_SIZE = 1000;
51
52 // ------------------------------------------------------------------------
53 // Attributes
54 // ------------------------------------------------------------------------
55
56 // The trace path
57 private final String fPath;
58
59 // The trace name
60 private final String fName;
61
62 // The cache page size AND checkpoints interval
63 protected int fIndexPageSize;
64
65 // The set of event stream checkpoints (for random access)
66 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
67
68 // The number of events collected
69 protected long fNbEvents = 0;
70
71 // The time span of the event stream
72 private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang);
73
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
78 /**
79 * @param path
80 * @param cacheSize
81 * @throws FileNotFoundException
82 */
83 protected TmfTrace(Class<T> type, String path, int cacheSize) throws FileNotFoundException {
84 super(type);
85 int sep = path.lastIndexOf(File.separator);
86 fName = (sep >= 0) ? path.substring(sep + 1) : path;
87 fPath = path;
88 fIndexPageSize = (cacheSize > 0) ? cacheSize : DEFAULT_CACHE_SIZE;
89 }
90
91 /**
92 * @param path
93 * @throws FileNotFoundException
94 */
95 protected TmfTrace(Class<T> type, String path) throws FileNotFoundException {
96 this(type, path, DEFAULT_CACHE_SIZE);
97 }
98
99 // ------------------------------------------------------------------------
100 // Accessors
101 // ------------------------------------------------------------------------
102
103 /**
104 * @return the trace path
105 */
106 public String getPath() {
107 return fPath;
108 }
109
110 /**
111 * @return the trace name
112 */
113 @Override
114 public String getName() {
115 return fName;
116 }
117
118 /* (non-Javadoc)
119 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getNbEvents()
120 */
121 public long getNbEvents() {
122 return fNbEvents;
123 }
124
125 /**
126 * @return the size of the cache
127 */
128 public int getCacheSize() {
129 return fIndexPageSize;
130 }
131
132 /* (non-Javadoc)
133 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getTimeRange()
134 */
135 public TmfTimeRange getTimeRange() {
136 return fTimeRange;
137 }
138
139 /* (non-Javadoc)
140 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStartTime()
141 */
142 public TmfTimestamp getStartTime() {
143 return fTimeRange.getStartTime();
144 }
145
146 /* (non-Javadoc)
147 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getEndTime()
148 */
149 public TmfTimestamp getEndTime() {
150 return fTimeRange.getEndTime();
151 }
152
153 // ------------------------------------------------------------------------
154 // Operators
155 // ------------------------------------------------------------------------
156
157 protected void setTimeRange(TmfTimeRange range) {
158 fTimeRange = range;
159 }
160
161 protected void setStartTime(TmfTimestamp startTime) {
162 fTimeRange = new TmfTimeRange(startTime, fTimeRange.getEndTime());
163 }
164
165 protected void setEndTime(TmfTimestamp endTime) {
166 fTimeRange = new TmfTimeRange(fTimeRange.getStartTime(), endTime);
167 }
168
169 // ------------------------------------------------------------------------
170 // TmfProvider
171 // ------------------------------------------------------------------------
172
173 @Override
174 public ITmfContext armRequest(TmfDataRequest<T> request) {
175 if (request instanceof TmfEventRequest<?>) {
176 return seekEvent(((TmfEventRequest<T>) request).getRange().getStartTime());
177 }
178 if (request instanceof TmfCoalescedEventRequest<?>) {
179 return seekEvent(((TmfCoalescedEventRequest<T>) request).getRange().getStartTime());
180 }
181 return null;
182 }
183
184 /**
185 * Return the next piece of data based on the context supplied. The context
186 * would typically be updated for the subsequent read.
187 *
188 * @param context
189 * @return
190 */
191 @SuppressWarnings("unchecked")
192 @Override
193 public T getNext(ITmfContext context) {
194 if (context instanceof TmfContext) {
195 return (T) getNextEvent((TmfContext) context);
196 }
197 return null;
198 }
199
200 // @Override
201 // public boolean isCompleted(TmfDataRequest<T> request, T data) {
202 // if (request instanceof TmfEventRequest<?> && data != null) {
203 // return data.getTimestamp().compareTo(((TmfEventRequest<T>) request).getRange().getEndTime(), false) > 0;
204 // }
205 // return true;
206 // }
207
208
209 // ------------------------------------------------------------------------
210 // ITmfTrace
211 // ------------------------------------------------------------------------
212
213 /* (non-Javadoc)
214 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
215 */
216 public TmfContext seekEvent(TmfTimestamp timestamp) {
217
218 if (timestamp == null) {
219 timestamp = TmfTimestamp.BigBang;
220 }
221
222 // First, find the right checkpoint
223 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
224
225 // In the very likely case that the checkpoint was not found, bsearch
226 // returns its negated would-be location (not an offset...). From that
227 // index, we can then position the stream and get the event.
228 if (index < 0) {
229 index = Math.max(0, -(index + 2));
230 }
231
232 // Position the stream at the checkpoint
233 ITmfLocation<?> location;
234 synchronized (fCheckpoints) {
235 if (fCheckpoints.size() > 0) {
236 if (index >= fCheckpoints.size()) {
237 index = fCheckpoints.size() - 1;
238 }
239 location = fCheckpoints.elementAt(index).getLocation();
240 }
241 else {
242 location = null;
243 }
244 }
245 TmfContext nextEventContext = seekLocation(location);
246 nextEventContext.setRank(index * fIndexPageSize);
247 TmfContext currentEventContext = new TmfContext(nextEventContext);
248
249 // And get the event
250 TmfEvent event = getNextEvent(nextEventContext);
251 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
252 currentEventContext.setLocation(nextEventContext.getLocation());
253 currentEventContext.updateRank(1);
254 event = getNextEvent(nextEventContext);
255 }
256
257 return currentEventContext;
258 }
259
260 /* (non-Javadoc)
261 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(int)
262 */
263 public TmfContext seekEvent(long rank) {
264
265 // Position the stream at the previous checkpoint
266 int index = (int) rank / fIndexPageSize;
267 ITmfLocation<?> location;
268 synchronized (fCheckpoints) {
269 if (fCheckpoints.size() > 0) {
270 if (index >= fCheckpoints.size()) {
271 index = fCheckpoints.size() - 1;
272 }
273 location = fCheckpoints.elementAt(index).getLocation();
274 }
275 else {
276 location = null;
277 }
278 }
279 TmfContext context = seekLocation(location);
280 long pos = index * fIndexPageSize;
281 context.setRank(pos);
282
283 if (pos < rank) {
284 TmfEvent event = getNextEvent(context);
285 while (event != null && ++pos < rank) {
286 event = getNextEvent(context);
287 }
288 }
289
290 return new TmfContext(context.getLocation(), context.getRank());
291 }
292
293 /* (non-Javadoc)
294 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getNextEvent(org.eclipse.linuxtools.tmf.trace.ITmfTrace.TraceContext)
295 */
296 public synchronized TmfEvent getNextEvent(TmfContext context) {
297 // parseEvent() does not update the context
298 TmfEvent event = parseEvent(context);
299 context.setLocation(getCurrentLocation());
300 context.updateRank(1);
301 if (event != null) {
302 processEvent(event);
303 }
304 return event;
305 }
306
307 /**
308 * Hook for "special" processing by the concrete class
309 * (called by getNextEvent())
310 *
311 * @param event
312 */
313 public void processEvent(TmfEvent event) {
314 // Do nothing by default
315 }
316
317 /**
318 * To be implemented by the concrete class
319 */
320 public abstract TmfContext seekLocation(ITmfLocation<?> location);
321 public abstract ITmfLocation<?> getCurrentLocation();
322 public abstract TmfEvent parseEvent(TmfContext context);
323
324 // ------------------------------------------------------------------------
325 // toString
326 // ------------------------------------------------------------------------
327
328 /* (non-Javadoc)
329 * @see java.lang.Object#toString()
330 */
331 @Override
332 public String toString() {
333 return "[TmfTrace (" + fName + "]";
334 }
335
336 // ------------------------------------------------------------------------
337 // Indexing
338 // ------------------------------------------------------------------------
339
340 /*
341 * The purpose of the index is to keep the information needed to rapidly
342 * access a trace event based on its timestamp or rank.
343 *
344 * NOTE: As it is, doesn't work for streaming traces.
345 */
346
347 private IndexingJob job;
348
349 // Indicates that an indexing job is already running
350 private Boolean fIndexing = false;
351 private Boolean fIndexed = false;
352
353 public void indexTrace(boolean waitForCompletion) {
354 synchronized (fIndexing) {
355 if (fIndexed || fIndexing) {
356 return;
357 }
358 fIndexing = true;
359 }
360
361 job = new IndexingJob("Indexing " + fName);
362 job.schedule();
363
364 if (waitForCompletion) {
365 try {
366 job.join();
367 } catch (InterruptedException e) {
368 e.printStackTrace();
369 }
370 }
371 }
372
373 private class IndexingJob extends Job {
374
375 public IndexingJob(String name) {
376 super(name);
377 }
378
379 /* (non-Javadoc)
380 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
381 */
382 @Override
383 protected IStatus run(IProgressMonitor monitor) {
384
385 monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
386
387 int nbEvents = 0;
388 TmfTimestamp startTime = null;
389 TmfTimestamp lastTime = null;
390
391 // Reset the index
392 fCheckpoints = new Vector<TmfCheckpoint>();
393
394 try {
395 // Position the trace at the beginning
396 TmfContext context = seekLocation(null);
397 ITmfLocation<?> location = context.getLocation();
398
399 // Get the first event
400 TmfEvent event = getNextEvent(context);
401 if (event != null) {
402 startTime = new TmfTimestamp(event.getTimestamp());
403 }
404
405 // Index the trace
406 while (event != null) {
407 lastTime = event.getTimestamp();
408 if ((nbEvents++ % fIndexPageSize) == 0) {
409 lastTime = new TmfTimestamp(event.getTimestamp());
410 fCheckpoints.add(new TmfCheckpoint(lastTime, location.clone()));
411 fNbEvents = nbEvents;
412 fTimeRange = new TmfTimeRange(startTime, lastTime);
413 notifyListeners(new TmfTimeRange(startTime, lastTime));
414
415 monitor.worked(1);
416
417 // Check monitor *after* fCheckpoints has been updated
418 if (monitor.isCanceled()) {
419 monitor.done();
420 return Status.CANCEL_STATUS;
421 }
422 }
423
424 // We will need this location at the next iteration
425 if ((nbEvents % fIndexPageSize) == 0) {
426 location = context.getLocation();
427 }
428
429 event = getNextEvent(context);
430 }
431 }
432 finally {
433 synchronized(this) {
434 fNbEvents = nbEvents;
435 fTimeRange = new TmfTimeRange(startTime, lastTime);
436 fIndexing = false;
437 fIndexed = true;
438 }
439 notifyListeners(new TmfTimeRange(startTime, lastTime));
440 monitor.done();
441 }
442
443 // createOffsetsFile();
444 // dumpCheckpoints();
445
446 return Status.OK_STATUS;
447 }
448 }
449
450 protected void notifyListeners(TmfTimeRange range) {
451 broadcast(new TmfTraceUpdatedSignal(this, this, range));
452 }
453
454 // ========================================================================
455 // Toubleshooting code
456 // ========================================================================
457
458 // private void dumpCheckpoints() {
459 // System.out.println("-----");
460 // System.out.println("Checkpoints of " + fName);
461 // for (int i = 0; i < fCheckpoints.size(); i++) {
462 // TmfCheckpoint checkpoint = fCheckpoints.get(i);
463 // TmfContext context = new TmfContext(checkpoint.getLocation(), i * fIndexPageSize);
464 // TmfEvent event = getNext(context);
465 // System.out.println(" Entry: " + i + " rank: " + (context.getRank() - 1) + " timestamp: " + checkpoint.getTimestamp() + ", event: " + event.getTimestamp());
466 // assert((checkpoint.getTimestamp().compareTo(event.getTimestamp(), false) == 0));
467 // }
468 // System.out.println();
469 // }
470
471 // private void createOffsetsFile() {
472 //
473 // try {
474 // // The trace context validation file is read by TmfTraceContext
475 // ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream("TmfTraceContext.dat")));
476 //
477 // TmfTraceContext context = null;
478 // context = seekLocation(null);
479 // out.writeObject(context.getLocation());
480 //
481 // int nbEvents = 0;
482 // while (getNextEvent(context) != null) {
483 // out.writeObject(context.getLocation());
484 // nbEvents++;
485 // }
486 // out.close();
487 // System.out.println("TmfTrace wrote " + nbEvents + " events");
488 // } catch (FileNotFoundException e) {
489 // // TODO Auto-generated catch block
490 // e.printStackTrace();
491 // } catch (IOException e) {
492 // // TODO Auto-generated catch block
493 // e.printStackTrace();
494 // }
495 // }
496 //
497 // private void createOffsetsFile() {
498 //
499 // try {
500 // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream("LTTngOffsets.dat")));
501 //
502 // TmfTraceContext context = null;
503 // context = seekLocation(null);
504 //
505 // TmfEvent event;
506 // int nbEvents = 0;
507 // while ((event = getNextEvent(context)) != null) {
508 // out.writeUTF(event.getTimestamp().toString());
509 // nbEvents++;
510 // }
511 // out.close();
512 // System.out.println("TmfTrace wrote " + nbEvents + " events");
513 // } catch (FileNotFoundException e) {
514 // // TODO Auto-generated catch block
515 // e.printStackTrace();
516 // } catch (IOException e) {
517 // // TODO Auto-generated catch block
518 // e.printStackTrace();
519 // }
520 // }
521
522 }
This page took 0.042355 seconds and 5 git commands to generate.