import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
+import org.eclipse.linuxtools.tmf.component.TmfComponent;
import org.eclipse.linuxtools.tmf.event.TmfEvent;
import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
import org.eclipse.linuxtools.tmf.request.ITmfRequestHandler;
import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
-import org.eclipse.linuxtools.tmf.signal.TmfSignalManager;
/**
* <b><u>TmfTrace</u></b>
*
* TODO: Add support for live streaming (notifications, incremental indexing, ...)
*/
-public abstract class TmfTrace implements ITmfTrace, ITmfRequestHandler<TmfEvent> {
+public abstract class TmfTrace extends TmfComponent implements ITmfTrace, ITmfRequestHandler<TmfEvent> {
// ========================================================================
// Constants
// ========================================================================
// The default number of events to cache
- public static final int DEFAULT_PAGE_SIZE = 1000;
+ public static final int DEFAULT_CACHE_SIZE = 1000;
// ========================================================================
// Attributes
// The trace name
private final String fName;
- // The checkpoints page size
- private final int fPageSize;
+ // The cache page size AND checkpoints interval
+ private final int fCacheSize;
// Indicate if the stream should be pre-indexed
private final boolean fWaitForIndexCompletion;
/**
* @param name
* @param pageSize
- * @param index
+ * @param data.index
* @throws FileNotFoundException
*/
protected TmfTrace(String path, int pageSize, boolean waitForIndexCompletion) throws FileNotFoundException {
+ super();
int sep = path.lastIndexOf(File.separator);
fName = (sep >= 0) ? path.substring(sep + 1) : path;
fPath = path;
- fPageSize = (pageSize > 0) ? pageSize : DEFAULT_PAGE_SIZE;
+ fCacheSize = (pageSize > 0) ? pageSize : DEFAULT_CACHE_SIZE;
fWaitForIndexCompletion = waitForIndexCompletion;
}
* @throws FileNotFoundException
*/
protected TmfTrace(String name, boolean waitForIndexCompletion) throws FileNotFoundException {
- this(name, DEFAULT_PAGE_SIZE, waitForIndexCompletion);
+ this(name, DEFAULT_CACHE_SIZE, waitForIndexCompletion);
}
/**
* @throws FileNotFoundException
*/
protected TmfTrace(String name) throws FileNotFoundException {
- this(name, DEFAULT_PAGE_SIZE, false);
+ this(name, DEFAULT_CACHE_SIZE, false);
}
// ========================================================================
/**
* @return the size of the cache
*/
- public int getPageSize() {
- return fPageSize;
+ public int getCacheSize() {
+ return fCacheSize;
}
/* (non-Javadoc)
protected long getIndex(TmfTimestamp timestamp) {
TmfTraceContext context = seekEvent(timestamp);
- return context.index;
+ return context.getIndex();
}
protected TmfTimestamp getTimestamp(int index) {
TmfTraceContext context = seekEvent(index);
- return context.timestamp;
+ return context.getTimestamp();
}
// ========================================================================
// First, find the right checkpoint
int index = Collections.binarySearch(fCheckpoints, new TmfTraceCheckpoint(timestamp, 0));
- // In the very likely event that the checkpoint was not found, bsearch
+ // In the very likely case that the checkpoint was not found, bsearch
// returns its negated would-be location (not an offset...). From that
// index, we can then position the stream and get the event.
if (index < 0) {
}
// Position the stream at the checkpoint
- Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
+ Object location;
+ synchronized (fCheckpoints) { //Just in case we are re-indexing
+ location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
+ }
TmfTraceContext nextEventContext = seekLocation(location);
- nextEventContext.index = index * fPageSize;
+ nextEventContext.setIndex(index * fCacheSize);
TmfTraceContext currentEventContext = new TmfTraceContext(nextEventContext);
// And get the event
TmfEvent event = getNextEvent(nextEventContext);
while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
- currentEventContext.location = nextEventContext.location;
- currentEventContext.index++;
+ currentEventContext.setLocation(nextEventContext.getLocation());
+ currentEventContext.incrIndex();
event = getNextEvent(nextEventContext);
}
- currentEventContext.timestamp = (event != null) ? event.getTimestamp() : null;
+ currentEventContext.setTimestamp((event != null) ? event.getTimestamp() : null);
return currentEventContext;
}
public TmfTraceContext seekEvent(long position) {
// Position the stream at the previous checkpoint
- int index = (int) position / fPageSize;
- Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
+ int index = (int) position / fCacheSize;
+ Object location;
+ synchronized (fCheckpoints) { //Just in case we are re-indexing
+ location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
+ }
TmfTraceContext nextEventContext = seekLocation(location);
- nextEventContext.index = index * fPageSize;
+ nextEventContext.setIndex(index * fCacheSize);
TmfTraceContext currentEventContext = new TmfTraceContext(nextEventContext);
// And locate the event (if it exists)
TmfEvent event = getNextEvent(nextEventContext);
- while (event != null && currentEventContext.index < position) {
- currentEventContext.location = nextEventContext.location;
- currentEventContext.timestamp = event.getTimestamp();
- currentEventContext.index++;
+ while (event != null && currentEventContext.getIndex() < position) {
+ currentEventContext.setLocation(nextEventContext.getLocation());
+ currentEventContext.setTimestamp(event.getTimestamp());
+ currentEventContext.incrIndex();
event = getNextEvent(nextEventContext);
}
/* (non-Javadoc)
* @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getNextEvent(org.eclipse.linuxtools.tmf.trace.ITmfTrace.TraceContext)
*/
- public synchronized TmfEvent getNextEvent(TmfTraceContext context) {
+ public TmfEvent getNextEvent(TmfTraceContext context) {
+ // parseEvent updates the context
TmfEvent event = parseEvent(context);
if (event != null) {
- context.location = getCurrentLocation();
- context.timestamp = event.getTimestamp();
- context.index++;
processEvent(event);
}
return event;
}
/**
- * TODO: Document me
- * @return
+ * To be implemented by the subclass.
*/
public abstract Object getCurrentLocation();
public abstract TmfEvent parseEvent(TmfTraceContext context);
public void processRequest(TmfDataRequest<TmfEvent> request, boolean waitForCompletion) {
// Process the request
- processEventRequest(request);
+ processDataRequest(request);
// Wait for completion if needed
if (waitForCompletion) {
*
* @param request
*/
- private void processEventRequest(final TmfDataRequest<TmfEvent> request) {
+ private void processDataRequest(final TmfDataRequest<TmfEvent> request) {
// Initialize the trace context
final TmfTraceContext context = (request.getRange() != null) ?
{
events.add(event);
if (++nbEvents % blockSize == 0) {
- TmfEvent[] result = new TmfEvent[events.size()];
- events.toArray(result);
- request.setData(result);
- request.handleData();
- events.removeAllElements();
+ pushData(request, events);
}
// To avoid an unnecessary read passed the last event requested
if (nbEvents < nbRequestedEvents)
event = getNextEvent(context);
}
- TmfEvent[] result = new TmfEvent[events.size()];
- events.toArray(result);
- request.setData(result);
-
- request.handleData();
+ pushData(request, events);
request.done();
}
};
thread.start();
}
+ /**
+ * Format the result data and notify the requester.
+ * Note: after handling, the data is *removed*.
+ *
+ * @param request
+ * @param events
+ */
+ private void pushData(TmfDataRequest<TmfEvent> request, Vector<TmfEvent> events) {
+ TmfEvent[] result = new TmfEvent[events.size()];
+ events.toArray(result);
+ request.setData(result);
+ request.handleData();
+ events.removeAllElements();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "[TmfTrace (" + fName + "]";
+ }
+
// ========================================================================
// Trace indexing. Essentially, parse the stream asynchronously and build
// the checkpoints index. This index is used to quickly find an event based
monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
- try {
- TmfTraceContext nextEventContext;
- synchronized(this) {
- nextEventContext = seekLocation(null);
- }
+ try {
+ TmfTraceContext nextEventContext = null;
+ nextEventContext = seekLocation(null);
TmfTraceContext currentEventContext = new TmfTraceContext(nextEventContext);
+
TmfEvent event = getNextEvent(nextEventContext);
if (event == null) {
return Status.OK_STATUS;
startTime = event.getTimestamp();
lastTime = event.getTimestamp();
- fCheckpoints.add(new TmfTraceCheckpoint(lastTime, currentEventContext.location));
- currentEventContext.location = nextEventContext.location;
+ fCheckpoints.add(new TmfTraceCheckpoint(lastTime, currentEventContext.getLocation()));
+ currentEventContext.setLocation(nextEventContext.getLocation());
rangeStartTime = startTime;
while ((event = getNextEvent(nextEventContext)) != null) {
lastTime = event.getTimestamp();
- if ((++nbEvents % fPageSize) == 0) {
- synchronized(this) {
- fCheckpoints.add(new TmfTraceCheckpoint(lastTime, currentEventContext.location));
- fNbEvents = nbEvents;
- fTimeRange = new TmfTimeRange(startTime, lastTime);
- }
+ if ((++nbEvents % fCacheSize) == 0) {
+ fCheckpoints.add(new TmfTraceCheckpoint(lastTime, currentEventContext.getLocation()));
+ fNbEvents = nbEvents - 1;
+ fTimeRange = new TmfTimeRange(startTime, lastTime);
notifyListeners(new TmfTimeRange(rangeStartTime, lastTime));
monitor.worked(1);
// Do whatever
processEvent(event);
- currentEventContext.location = nextEventContext.location;
+ currentEventContext.setLocation(nextEventContext.getLocation());
}
}
finally {
}
private void notifyListeners(TmfTimeRange range) {
- TmfSignalManager.dispatchSignal(new TmfTraceUpdatedSignal(this, this, range));
+ broadcastSignal(new TmfTraceUpdatedSignal(this, this, range));
}
}