1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.stream
;
15 import java
.io
.FileNotFoundException
;
16 import java
.io
.IOException
;
17 import java
.util
.Collections
;
18 import java
.util
.Vector
;
20 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
21 import org
.eclipse
.core
.runtime
.IStatus
;
22 import org
.eclipse
.core
.runtime
.Status
;
23 import org
.eclipse
.core
.runtime
.jobs
.Job
;
24 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEvent
;
25 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
26 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalManager
;
30 * <b><u>TmfEventStream</u></b>
32 * TODO: Implement me. Please.
34 public abstract class TmfEventStream
implements ITmfEventStream
{
36 // ========================================================================
38 // ========================================================================
40 // The default number of events to cache
41 public static final int DEFAULT_CACHE_SIZE
= 1000;
43 // ========================================================================
45 // ========================================================================
48 private final String fName
;
51 private final ITmfEventParser fParser
;
54 private final int fCacheSize
;
56 // The set of event stream checkpoints (for random access)
57 private Vector
<TmfStreamCheckpoint
> fCheckpoints
= new Vector
<TmfStreamCheckpoint
>();
59 // The number of events collected
60 private int fNbEvents
= 0;
62 // The time span of the event stream
63 private TmfTimeRange fTimeRange
= new TmfTimeRange(TmfTimestamp
.BigBang
, TmfTimestamp
.BigBang
);
65 // ========================================================================
67 // ========================================================================
73 * @throws FileNotFoundException
75 protected TmfEventStream(String filename
, ITmfEventParser parser
, int cacheSize
) throws FileNotFoundException
{
78 fCacheSize
= cacheSize
;
84 * @throws FileNotFoundException
86 protected TmfEventStream(String filename
, ITmfEventParser parser
) throws FileNotFoundException
{
87 this(filename
, parser
, DEFAULT_CACHE_SIZE
);
90 // ========================================================================
92 // ========================================================================
97 public int getCacheSize() {
104 public String
getName() {
109 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getNbEvents()
111 public int getNbEvents() {
116 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getTimeRange()
118 public TmfTimeRange
getTimeRange() {
123 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getIndex(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
125 public int getIndex(TmfTimestamp timestamp
) {
126 StreamContext context
= seekEvent(timestamp
);
127 return context
.index
;
130 // ========================================================================
132 // ========================================================================
134 public StreamContext
seekEvent(TmfTimestamp timestamp
) {
136 // First, find the right checkpoint
137 int index
= Collections
.binarySearch(fCheckpoints
, new TmfStreamCheckpoint(timestamp
, 0));
139 // In the very likely event that the checkpoint was not found, bsearch
140 // returns its negated would-be location (not an offset...). From that
141 // index, we can then position the stream and get the event.
143 index
= Math
.max(0, -(index
+ 2));
146 // Position the stream at the checkpoint
147 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
148 StreamContext nextEventContext
;
150 nextEventContext
= seekLocation(location
);
152 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
, index
* fCacheSize
);
155 TmfEvent event
= getNextEvent(nextEventContext
);
156 while (event
!= null && event
.getTimestamp().compareTo(timestamp
, false) < 0) {
157 currentEventContext
.location
= nextEventContext
.location
;
158 currentEventContext
.index
++;
159 event
= getNextEvent(nextEventContext
);
162 return currentEventContext
;
165 public StreamContext
seekEvent(int position
) {
167 // Position the stream at the previous checkpoint
168 int index
= position
/ fCacheSize
;
169 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
170 StreamContext nextEventContext
;
172 nextEventContext
= seekLocation(location
);
174 StreamContext currentEventContext
= new StreamContext(nextEventContext
);
176 // And locate the event (if it exists)
177 int current
= index
* fCacheSize
;
178 TmfEvent event
= getNextEvent(nextEventContext
);
179 while (event
!= null && current
< position
) {
180 currentEventContext
.location
= nextEventContext
.location
;
181 event
= getNextEvent(nextEventContext
);
185 return currentEventContext
;
188 public TmfEvent
getEvent(StreamContext context
, TmfTimestamp timestamp
) {
190 // Position the stream and update the context object
191 StreamContext ctx
= seekEvent(timestamp
);
192 context
.location
= ctx
.location
;
194 return getNextEvent(context
);
197 public TmfEvent
getEvent(StreamContext context
, int position
) {
199 // Position the stream and update the context object
200 StreamContext ctx
= seekEvent(position
);
201 context
.location
= ctx
.location
;
203 return getNextEvent(context
);
206 public synchronized TmfEvent
getNextEvent(StreamContext context
) {
208 seekLocation(context
.location
);
209 TmfEvent event
= fParser
.getNextEvent(this);
210 context
.location
= getCurrentLocation();
212 } catch (IOException e
) {
218 private void notifyListeners() {
219 TmfSignalManager
.dispatchSignal(new TmfStreamUpdatedSignal(this, this));
223 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
225 public void indexStream(boolean waitForCompletion
) {
226 IndexingJob job
= new IndexingJob(fName
);
228 if (waitForCompletion
) {
231 } catch (InterruptedException e
) {
232 // TODO Auto-generated catch block
238 private class IndexingJob
extends Job
{
240 public IndexingJob(String name
) {
245 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
248 protected IStatus
run(IProgressMonitor monitor
) {
251 TmfTimestamp startTime
= new TmfTimestamp();
252 TmfTimestamp lastTime
= new TmfTimestamp();
254 monitor
.beginTask("Indexing " + fName
, IProgressMonitor
.UNKNOWN
);
257 StreamContext nextEventContext
;
259 nextEventContext
= seekLocation(null);
261 StreamContext currentEventContext
= new StreamContext(nextEventContext
);
262 TmfEvent event
= getNextEvent(nextEventContext
);
264 startTime
= event
.getTimestamp();
265 lastTime
= event
.getTimestamp();
268 while (event
!= null) {
269 lastTime
= event
.getTimestamp();
270 if ((nbEvents
++ % fCacheSize
) == 0) {
271 TmfStreamCheckpoint bookmark
= new TmfStreamCheckpoint(lastTime
, currentEventContext
.location
);
273 fCheckpoints
.add(bookmark
);
274 fNbEvents
= nbEvents
;
275 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
279 // Check monitor *after* fCheckpoints has been updated
280 if (monitor
.isCanceled()) {
281 return Status
.CANCEL_STATUS
;
285 currentEventContext
.location
= nextEventContext
.location
;
286 event
= getNextEvent(nextEventContext
);
291 fNbEvents
= nbEvents
;
292 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
298 return Status
.OK_STATUS
;