1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.stream
;
15 import java
.io
.FileNotFoundException
;
16 import java
.io
.IOException
;
17 import java
.util
.Collections
;
18 import java
.util
.HashSet
;
20 import java
.util
.Vector
;
22 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
23 import org
.eclipse
.core
.runtime
.IStatus
;
24 import org
.eclipse
.core
.runtime
.Status
;
25 import org
.eclipse
.core
.runtime
.jobs
.Job
;
26 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEvent
;
27 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
28 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
29 import org
.eclipse
.linuxtools
.tmf
.trace
.TmfTrace
;
32 * <b><u>AbstractTmfEventStream</u></b>
34 * TODO: Implement me. Please.
36 public abstract class AbstractTmfEventStream
implements ITmfEventStream
{
38 // ========================================================================
40 // ========================================================================
42 // The default number of events to cache
43 public static final int DEFAULT_CACHE_SIZE
= 1000;
45 // ========================================================================
47 // ========================================================================
50 private final String fName
;
53 private final ITmfEventParser fParser
;
56 private final int fCacheSize
;
58 // The set of event stream checkpoints (for random access)
59 private Vector
<TmfStreamCheckpoint
> fCheckpoints
= new Vector
<TmfStreamCheckpoint
>();
61 // The number of events collected
62 private int fNbEvents
= 0;
64 // The time span of the event stream
65 private TmfTimeRange fTimeRange
= new TmfTimeRange(TmfTimestamp
.BigBang
, TmfTimestamp
.BigBang
);
68 private Set
<TmfTrace
> fListeners
= new HashSet
<TmfTrace
>();
70 // ========================================================================
72 // ========================================================================
78 * @throws FileNotFoundException
80 protected AbstractTmfEventStream(String filename
, ITmfEventParser parser
, int cacheSize
) throws FileNotFoundException
{
83 fCacheSize
= cacheSize
;
89 * @throws FileNotFoundException
91 protected AbstractTmfEventStream(String filename
, ITmfEventParser parser
) throws FileNotFoundException
{
92 this(filename
, parser
, DEFAULT_CACHE_SIZE
);
95 // ========================================================================
97 // ========================================================================
102 public int getCacheSize() {
109 public String
getName() {
116 public synchronized int getNbEvents() {
123 public synchronized TmfTimeRange
getTimeRange() {
127 // ========================================================================
129 // ========================================================================
131 public StreamContext
seekEvent(TmfTimestamp timestamp
) {
133 // First, find the right checkpoint
134 int index
= Collections
.binarySearch(fCheckpoints
, new TmfStreamCheckpoint(timestamp
, 0));
136 // In the very likely event that the checkpoint was not found, bsearch
137 // returns its negated would-be location (not an offset...). From that
138 // index, we can then position the stream and get the event.
140 index
= Math
.max(0, -(index
+ 2));
143 // Position the stream at the checkpoint
144 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
145 StreamContext nextEventContext
;
147 nextEventContext
= seekLocation(location
);
149 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
152 TmfEvent event
= getNextEvent(nextEventContext
);
153 while (event
!= null && event
.getTimestamp().compareTo(timestamp
, false) < 0) {
154 currentEventContext
.location
= nextEventContext
.location
;
155 event
= getNextEvent(nextEventContext
);
158 return currentEventContext
;
161 public StreamContext
seekEvent(int position
) {
163 // Position the stream at the previous checkpoint
164 int index
= position
/ fCacheSize
;
165 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
166 StreamContext nextEventContext
;
168 nextEventContext
= seekLocation(location
);
170 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
172 // And locate the event (if it exists)
173 int current
= index
* fCacheSize
;
174 TmfEvent event
= getNextEvent(nextEventContext
);
175 while (event
!= null && current
< position
) {
176 currentEventContext
.location
= nextEventContext
.location
;
177 event
= getNextEvent(nextEventContext
);
181 return currentEventContext
;
184 public TmfEvent
getEvent(StreamContext context
, TmfTimestamp timestamp
) {
186 // Position the stream and update the context object
187 StreamContext ctx
= seekEvent(timestamp
);
188 context
.location
= ctx
.location
;
190 return getNextEvent(context
);
193 public TmfEvent
getEvent(StreamContext context
, int position
) {
195 // Position the stream and update the context object
196 StreamContext ctx
= seekEvent(position
);
197 context
.location
= ctx
.location
;
199 return getNextEvent(context
);
202 public synchronized TmfEvent
getNextEvent(StreamContext context
) {
204 seekLocation(context
.location
);
205 TmfEvent event
= fParser
.getNextEvent(this);
206 context
.location
= getCurrentLocation();
208 } catch (IOException e
) {
214 public synchronized void addListener(TmfTrace listener
) {
215 fListeners
.add(listener
);
218 public synchronized void removeListener(TmfTrace listener
) {
219 fListeners
.remove(listener
);
222 private synchronized void notifyListeners() {
223 for (TmfTrace listener
: fListeners
) {
224 listener
.handleEvent(new TmfStreamUpdateEvent(this));
229 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
231 public void indexStream(boolean waitForCompletion
) {
232 IndexingJob job
= new IndexingJob(fName
);
234 if (waitForCompletion
) {
237 } catch (InterruptedException e
) {
238 // TODO Auto-generated catch block
244 private class IndexingJob
extends Job
{
246 public IndexingJob(String name
) {
251 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
254 protected IStatus
run(IProgressMonitor monitor
) {
257 TmfTimestamp startTime
= new TmfTimestamp();
258 TmfTimestamp lastTime
= new TmfTimestamp();
260 monitor
.beginTask("Indexing " + fName
, IProgressMonitor
.UNKNOWN
);
263 StreamContext nextEventContext
;
265 nextEventContext
= seekLocation(null);
267 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
268 TmfEvent event
= getNextEvent(nextEventContext
);
270 startTime
= event
.getTimestamp();
271 lastTime
= event
.getTimestamp();
274 while (event
!= null) {
275 lastTime
= event
.getTimestamp();
276 if ((nbEvents
++ % fCacheSize
) == 0) {
277 TmfStreamCheckpoint bookmark
= new TmfStreamCheckpoint(lastTime
, currentEventContext
.location
);
279 fCheckpoints
.add(bookmark
);
280 fNbEvents
= nbEvents
;
281 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
285 // Check monitor *after* fCheckpoints has been updated
286 if (monitor
.isCanceled()) {
287 return Status
.CANCEL_STATUS
;
291 currentEventContext
.location
= nextEventContext
.location
;
292 event
= getNextEvent(nextEventContext
);
297 fNbEvents
= nbEvents
;
298 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
304 return Status
.OK_STATUS
;
308 public void indexStream(final String filename
) {
310 // ProgressMonitorDialog dialog = new ProgressMonitorDialog(null);
312 // dialog.run(true, true, new IRunnableWithProgress() {
314 // public void run(IProgressMonitor monitor)
315 // throws InvocationTargetException, InterruptedException {
316 // monitor.beginTask("Indexing " + filename,
317 // IProgressMonitor.UNKNOWN);
320 // seekLocation(null);
321 // TmfTimestamp startTime = new TmfTimestamp();
322 // TmfTimestamp lastTime = new TmfTimestamp();
323 // Object location = getCurrentLocation();
325 // TmfEvent event = getNextEvent();
326 // if (event != null) {
327 // startTime = event.getTimestamp();
328 // while (event != null) {
329 // lastTime = event.getTimestamp();
330 // if ((fNbEvents++ % fCacheSize) == 0) {
331 // if (monitor.isCanceled()) {
332 // throw new CancellationException();
334 // TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(
335 // lastTime, location);
336 // fCheckpoints.add(bookmark);
337 // monitor.worked(1);
339 // location = getCurrentLocation();
340 // event = getNextEvent();
342 // fTimeRange = new TmfTimeRange(startTime, lastTime);
344 // seekLocation(null);
345 // } catch (IOException e) {
352 // } catch (InvocationTargetException e1) {
353 // // TODO Auto-generated catch block
354 // e1.printStackTrace();
355 // } catch (InterruptedException e1) {
356 // // TODO Auto-generated catch block
357 // e1.printStackTrace();