1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.trace
;
16 import java
.io
.FileNotFoundException
;
17 import java
.util
.Collections
;
18 import java
.util
.Vector
;
20 import org
.eclipse
.linuxtools
.tmf
.component
.TmfEventProvider
;
21 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEvent
;
22 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
23 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
.ExecutionType
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfEventRequest
;
27 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
28 import org
.eclipse
.linuxtools
.tmf
.request
.TmfEventRequest
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
30 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfTraceOpenedSignal
;
31 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfTraceUpdatedSignal
;
34 * <b><u>TmfTrace</u></b>
36 * Abstract implementation of ITmfTrace. It should be sufficient to extend this
37 * class and provide implementation for <code>getCurrentLocation()</code> and
38 * <code>seekLocation()</code>, as well as a proper parser, to have a working
39 * concrete implementation.
41 * Note: The notion of event rank is still under heavy discussion. Although
42 * used by the Events View and probably useful in the general case, there
43 * is no easy way to implement it for LTTng (actually a strong case is being
44 * made that this is useless).
46 * That it is not supported by LTTng does by no mean indicate that it is not
47 * useful for (just about) every other tracing tool. Therefore, this class
48 * provides a minimal (and partial) implementation of rank. However, the current
49 * implementation should not be relied on in the general case.
51 * TODO: Add support for live streaming (notifications, incremental indexing, ...)
53 public abstract class TmfTrace
<T
extends TmfEvent
> extends TmfEventProvider
<T
> implements ITmfTrace
, Cloneable
{
55 // ------------------------------------------------------------------------
57 // ------------------------------------------------------------------------
59 // The default number of events to cache
60 // TODO: Make the DEFAULT_CACHE_SIZE a preference
61 public static final int DEFAULT_INDEX_PAGE_SIZE
= 1000;
63 // ------------------------------------------------------------------------
65 // ------------------------------------------------------------------------
68 private final String fPath
;
70 // The cache page size AND checkpoints interval
71 protected int fIndexPageSize
;
73 // The set of event stream checkpoints (for random access)
74 protected Vector
<TmfCheckpoint
> fCheckpoints
= new Vector
<TmfCheckpoint
>();
76 // The number of events collected
77 protected long fNbEvents
= 0;
79 // The time span of the event stream
80 private TmfTimestamp fStartTime
= TmfTimestamp
.BigCrunch
;
81 private TmfTimestamp fEndTime
= TmfTimestamp
.BigBang
;
83 // ------------------------------------------------------------------------
85 // ------------------------------------------------------------------------
89 * @throws FileNotFoundException
91 protected TmfTrace(String name
, Class
<T
> type
, String path
) throws FileNotFoundException
{
92 this(name
, type
, path
, DEFAULT_INDEX_PAGE_SIZE
);
98 * @throws FileNotFoundException
100 protected TmfTrace(String name
, Class
<T
> type
, String path
, int cacheSize
) throws FileNotFoundException
{
102 int sep
= path
.lastIndexOf(File
.separator
);
103 String simpleName
= (sep
>= 0) ? path
.substring(sep
+ 1) : path
;
106 fIndexPageSize
= (cacheSize
> 0) ? cacheSize
: DEFAULT_INDEX_PAGE_SIZE
;
110 * @see java.lang.Object#clone()
112 @SuppressWarnings("unchecked")
114 public TmfTrace
<T
> clone() throws CloneNotSupportedException
{
115 TmfTrace
<T
> clone
= (TmfTrace
<T
>) super.clone();
116 clone
.fCheckpoints
= (Vector
<TmfCheckpoint
>) fCheckpoints
;
117 clone
.fStartTime
= new TmfTimestamp(fStartTime
);
118 clone
.fEndTime
= new TmfTimestamp(fEndTime
);
122 // ------------------------------------------------------------------------
124 // ------------------------------------------------------------------------
127 * @return the trace path
129 public String
getPath() {
134 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getNbEvents()
136 public long getNbEvents() {
141 * @return the size of the cache
143 public int getCacheSize() {
144 return fIndexPageSize
;
148 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getTimeRange()
150 public TmfTimeRange
getTimeRange() {
151 return new TmfTimeRange(fStartTime
, fEndTime
);
155 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStartTime()
157 public TmfTimestamp
getStartTime() {
162 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getEndTime()
164 public TmfTimestamp
getEndTime() {
168 @SuppressWarnings("unchecked")
169 public Vector
<TmfCheckpoint
> getCheckpoints() {
170 return (Vector
<TmfCheckpoint
>) fCheckpoints
.clone();
174 * Returns the rank of the first event with the requested timestamp.
175 * If none, returns the index of the next event (if any).
180 public long getRank(TmfTimestamp timestamp
) {
181 TmfContext context
= seekEvent(timestamp
);
182 return context
.getRank();
185 // ------------------------------------------------------------------------
187 // ------------------------------------------------------------------------
189 protected void setTimeRange(TmfTimeRange range
) {
190 fStartTime
= range
.getStartTime();
191 fEndTime
= range
.getEndTime();
194 protected void setStartTime(TmfTimestamp startTime
) {
195 fStartTime
= startTime
;
198 protected void setEndTime(TmfTimestamp endTime
) {
202 // ------------------------------------------------------------------------
204 // ------------------------------------------------------------------------
207 public ITmfContext
armRequest(ITmfDataRequest
<T
> request
) {
208 if (request
instanceof ITmfEventRequest
<?
>) {
209 return seekEvent(((ITmfEventRequest
<T
>) request
).getRange().getStartTime());
211 return seekEvent(request
.getIndex());
215 * Return the next piece of data based on the context supplied. The context
216 * would typically be updated for the subsequent read.
221 @SuppressWarnings("unchecked")
223 public T
getNext(ITmfContext context
) {
224 if (context
instanceof TmfContext
) {
225 return (T
) getNextEvent((TmfContext
) context
);
230 // ------------------------------------------------------------------------
232 // ------------------------------------------------------------------------
235 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
237 public TmfContext
seekEvent(TmfTimestamp timestamp
) {
239 if (timestamp
== null) {
240 timestamp
= TmfTimestamp
.BigBang
;
243 // First, find the right checkpoint
244 int index
= Collections
.binarySearch(fCheckpoints
, new TmfCheckpoint(timestamp
, null));
246 // In the very likely case that the checkpoint was not found, bsearch
247 // returns its negated would-be location (not an offset...). From that
248 // index, we can then position the stream and get the event.
250 index
= Math
.max(0, -(index
+ 2));
253 // Position the stream at the checkpoint
254 ITmfLocation
<?
> location
;
255 synchronized (fCheckpoints
) {
256 if (fCheckpoints
.size() > 0) {
257 if (index
>= fCheckpoints
.size()) {
258 index
= fCheckpoints
.size() - 1;
260 location
= fCheckpoints
.elementAt(index
).getLocation();
266 TmfContext context
= seekLocation(location
);
267 context
.setRank(index
* fIndexPageSize
);
269 // And locate the event
270 TmfContext nextEventContext
= context
.clone(); // Must use clone() to get the right subtype...
271 TmfEvent event
= getNextEvent(nextEventContext
);
272 while (event
!= null && event
.getTimestamp().compareTo(timestamp
, false) < 0) {
273 context
.setLocation(nextEventContext
.getLocation().clone());
274 context
.updateRank(1);
275 event
= getNextEvent(nextEventContext
);
282 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(int)
284 public TmfContext
seekEvent(long rank
) {
286 // Position the stream at the previous checkpoint
287 int index
= (int) rank
/ fIndexPageSize
;
288 ITmfLocation
<?
> location
;
289 synchronized (fCheckpoints
) {
290 if (fCheckpoints
.size() == 0) {
294 if (index
>= fCheckpoints
.size()) {
295 index
= fCheckpoints
.size() - 1;
297 location
= fCheckpoints
.elementAt(index
).getLocation();
301 TmfContext context
= seekLocation(location
);
302 long pos
= index
* fIndexPageSize
;
303 context
.setRank(pos
);
306 TmfEvent event
= getNextEvent(context
);
307 while (event
!= null && ++pos
< rank
) {
308 event
= getNextEvent(context
);
316 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getNextEvent(org.eclipse.linuxtools.tmf.trace.ITmfTrace.TraceContext)
318 public synchronized TmfEvent
getNextEvent(TmfContext context
) {
319 // parseEvent() does not update the context
320 TmfEvent event
= parseEvent(context
);
322 updateIndex(context
, context
.getRank(), event
.getTimestamp());
323 context
.setLocation(getCurrentLocation());
324 context
.updateRank(1);
330 protected synchronized void updateIndex(ITmfContext context
, long rank
, TmfTimestamp timestamp
) {
331 if (fStartTime
.compareTo(timestamp
, false) > 0) fStartTime
= timestamp
;
332 if (fEndTime
.compareTo(timestamp
, false) < 0) fEndTime
= timestamp
;
333 if (context
.isValidRank()) {
334 if (fNbEvents
<= rank
)
335 fNbEvents
= rank
+ 1;
336 // Build the index as we go along
337 if ((rank
% fIndexPageSize
) == 0) {
338 // Determine the table position
339 long position
= rank
/ fIndexPageSize
;
340 // Add new entry at proper location (if empty)
341 if (fCheckpoints
.size() == position
) {
342 ITmfLocation
<?
> location
= context
.getLocation().clone();
343 fCheckpoints
.add(new TmfCheckpoint(timestamp
, location
));
344 // System.out.println(getName() + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", " + location.toString());
351 * Hook for "special" processing by the concrete class
352 * (called by getNextEvent())
356 protected void processEvent(TmfEvent event
) {
357 // Do nothing by default
361 * To be implemented by the concrete class
363 public abstract TmfContext
seekLocation(ITmfLocation
<?
> location
);
364 public abstract ITmfLocation
<?
> getCurrentLocation();
365 public abstract TmfEvent
parseEvent(TmfContext context
);
367 // ------------------------------------------------------------------------
369 // ------------------------------------------------------------------------
372 * @see java.lang.Object#toString()
375 public String
toString() {
376 return "[TmfTrace (" + getName() + ")]";
379 // ------------------------------------------------------------------------
381 // ------------------------------------------------------------------------
384 * The purpose of the index is to keep the information needed to rapidly
385 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
389 @SuppressWarnings("unchecked")
390 private void indexTrace(boolean waitForCompletion
) {
392 fCheckpoints
.clear();
394 ITmfEventRequest
<TmfEvent
> request
= new TmfEventRequest
<TmfEvent
>(TmfEvent
.class, TmfTimeRange
.Eternity
, TmfDataRequest
.ALL_DATA
, 1, ITmfDataRequest
.ExecutionType
.LONG
) {
396 TmfTimestamp startTime
= null;
397 TmfTimestamp lastTime
= null;
400 public void handleData() {
401 TmfEvent
[] events
= getData();
402 if (events
.length
> 0) {
403 TmfTimestamp ts
= events
[0].getTimestamp();
404 if (startTime
== null) {
405 startTime
= new TmfTimestamp(ts
);
406 fStartTime
= startTime
;
408 lastTime
= new TmfTimestamp(ts
);
410 if ((fNbRead
% DEFAULT_INDEX_PAGE_SIZE
) == 0) {
417 public void handleSuccess() {
421 private void updateTraceData() {
422 fEndTime
= new TmfTimestamp(lastTime
);
428 sendRequest((ITmfDataRequest
<T
>) request
);
429 if (waitForCompletion
)
431 request
.waitForCompletion();
432 } catch (InterruptedException e
) {
437 protected void notifyListeners() {
438 broadcast(new TmfTraceUpdatedSignal(this, this, new TmfTimeRange(fStartTime
, fEndTime
)));
441 // ------------------------------------------------------------------------
443 // ------------------------------------------------------------------------
446 protected void queueLongRequest(final ITmfDataRequest
<T
> request
) {
448 // TODO: Handle the data requests also...
449 if (!(request
instanceof ITmfEventRequest
<?
>)) {
450 super.queueRequest(request
);
454 final TmfTrace
<T
> trace
= this;
456 Thread thread
= new Thread() {
460 // final long requestStart = System.nanoTime();
462 final Integer
[] CHUNK_SIZE
= new Integer
[1];
463 CHUNK_SIZE
[0] = fIndexPageSize
+ 1;
465 final ITmfEventRequest
<T
> req
= (ITmfEventRequest
<T
>) request
;
467 final Integer
[] nbRead
= new Integer
[1];
470 // final TmfTimestamp[] timestamp = new TmfTimestamp[1];
471 // timestamp[0] = new TmfTimestamp(req.getRange().getStartTime());
472 // final TmfTimestamp endTS = req.getRange().getEndTime();
474 final Boolean
[] isFinished
= new Boolean
[1];
475 isFinished
[0] = Boolean
.FALSE
;
477 while (!isFinished
[0]) {
479 // TmfEventRequest<T> subRequest = new TmfEventRequest<T>(req.getDataType(), new TmfTimeRange(timestamp[0], endTS),
480 // requestedSize, req.getBlockize(), ExecutionType.LONG)
481 TmfDataRequest
<T
> subRequest
= new TmfDataRequest
<T
>(req
.getDataType(), nbRead
[0], CHUNK_SIZE
[0],
482 req
.getBlockize(), ExecutionType
.LONG
)
486 public void handleData() {
487 T
[] data
= getData();
489 // System.out.println("First event of the block: " + data[0].getTimestamp());
492 // Tracer.trace("Ndx: " + ((TmfEvent) data[0]).getTimestamp());
495 if (fNbRead
== CHUNK_SIZE
[0]) {
496 nbRead
[0] += fNbRead
;
497 // System.out.println("fNbRead=" + fNbRead + ", count=" + count +", total=" + nbRead[0] + ", TS=" + data[0].getTimestamp());
499 if (fNbRead
> CHUNK_SIZE
[0]) {
500 System
.out
.println("ERROR - Read too many events");
504 public void handleCompleted() {
505 // System.out.println("Request completed at: " + timestamp[0]);
506 if (fNbRead
< CHUNK_SIZE
[0]) {
508 isFinished
[0] = Boolean
.TRUE
;
509 nbRead
[0] += fNbRead
;
510 // System.out.println("fNbRead=" + fNbRead + ", count=" + count +", total=" + nbRead[0]);
512 super.handleCompleted();
516 if (!isFinished
[0]) {
517 trace
.queueRequest(subRequest
);
520 subRequest
.waitForCompletion();
521 // System.out.println("Finished at " + timestamp[0]);
522 } catch (InterruptedException e
) {
526 // TmfTimestamp newTS = new TmfTimestamp(timestamp[0].getValue() + 1, timestamp[0].getScale(), timestamp[0].getPrecision());
527 // timestamp[0] = newTS;
528 CHUNK_SIZE
[0] = fIndexPageSize
;
529 // System.out.println("New timestamp: " + timestamp[0]);
532 // final long requestEnded = System.nanoTime();
533 // System.out.println("Background request completed. Elapsed= " + (requestEnded * 1.0 - requestStart) / 1000000000);
539 // ------------------------------------------------------------------------
541 // ------------------------------------------------------------------------
544 public void traceOpened(TmfTraceOpenedSignal signal
) {
545 ITmfTrace trace
= signal
.getTrace();