1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.stream
;
15 import java
.io
.FileNotFoundException
;
16 import java
.io
.IOException
;
17 import java
.util
.Collections
;
18 import java
.util
.Vector
;
20 import org
.eclipse
.core
.runtime
.IProgressMonitor
;
21 import org
.eclipse
.core
.runtime
.IStatus
;
22 import org
.eclipse
.core
.runtime
.Status
;
23 import org
.eclipse
.core
.runtime
.jobs
.Job
;
24 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEvent
;
25 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
26 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalManager
;
30 * <b><u>TmfEventStream</u></b>
32 * TODO: Implement me. Please.
34 public abstract class TmfEventStream
implements ITmfEventStream
{
36 // ========================================================================
38 // ========================================================================
40 // The default number of events to cache
41 public static final int DEFAULT_CACHE_SIZE
= 1000;
43 // ========================================================================
45 // ========================================================================
48 private final String fName
;
51 private final ITmfEventParser fParser
;
54 private final int fCacheSize
;
56 // The set of event stream checkpoints (for random access)
57 private Vector
<TmfStreamCheckpoint
> fCheckpoints
= new Vector
<TmfStreamCheckpoint
>();
59 // The number of events collected
60 private int fNbEvents
= 0;
62 // The time span of the event stream
63 private TmfTimeRange fTimeRange
= new TmfTimeRange(TmfTimestamp
.BigBang
, TmfTimestamp
.BigBang
);
65 // ========================================================================
67 // ========================================================================
73 * @throws FileNotFoundException
75 protected TmfEventStream(String filename
, ITmfEventParser parser
, int cacheSize
) throws FileNotFoundException
{
78 fCacheSize
= cacheSize
;
84 * @throws FileNotFoundException
86 protected TmfEventStream(String filename
, ITmfEventParser parser
) throws FileNotFoundException
{
87 this(filename
, parser
, DEFAULT_CACHE_SIZE
);
90 // ========================================================================
92 // ========================================================================
97 public int getCacheSize() {
104 public String
getName() {
111 public synchronized int getNbEvents() {
118 public synchronized TmfTimeRange
getTimeRange() {
122 // ========================================================================
124 // ========================================================================
126 public StreamContext
seekEvent(TmfTimestamp timestamp
) {
128 // First, find the right checkpoint
129 int index
= Collections
.binarySearch(fCheckpoints
, new TmfStreamCheckpoint(timestamp
, 0));
131 // In the very likely event that the checkpoint was not found, bsearch
132 // returns its negated would-be location (not an offset...). From that
133 // index, we can then position the stream and get the event.
135 index
= Math
.max(0, -(index
+ 2));
138 // Position the stream at the checkpoint
139 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
140 StreamContext nextEventContext
;
142 nextEventContext
= seekLocation(location
);
144 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
147 TmfEvent event
= getNextEvent(nextEventContext
);
148 while (event
!= null && event
.getTimestamp().compareTo(timestamp
, false) < 0) {
149 currentEventContext
.location
= nextEventContext
.location
;
150 event
= getNextEvent(nextEventContext
);
153 return currentEventContext
;
156 public StreamContext
seekEvent(int position
) {
158 // Position the stream at the previous checkpoint
159 int index
= position
/ fCacheSize
;
160 Object location
= (index
< fCheckpoints
.size()) ? fCheckpoints
.elementAt(index
).getLocation() : null;
161 StreamContext nextEventContext
;
163 nextEventContext
= seekLocation(location
);
165 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
167 // And locate the event (if it exists)
168 int current
= index
* fCacheSize
;
169 TmfEvent event
= getNextEvent(nextEventContext
);
170 while (event
!= null && current
< position
) {
171 currentEventContext
.location
= nextEventContext
.location
;
172 event
= getNextEvent(nextEventContext
);
176 return currentEventContext
;
179 public TmfEvent
getEvent(StreamContext context
, TmfTimestamp timestamp
) {
181 // Position the stream and update the context object
182 StreamContext ctx
= seekEvent(timestamp
);
183 context
.location
= ctx
.location
;
185 return getNextEvent(context
);
188 public TmfEvent
getEvent(StreamContext context
, int position
) {
190 // Position the stream and update the context object
191 StreamContext ctx
= seekEvent(position
);
192 context
.location
= ctx
.location
;
194 return getNextEvent(context
);
197 public synchronized TmfEvent
getNextEvent(StreamContext context
) {
199 seekLocation(context
.location
);
200 TmfEvent event
= fParser
.getNextEvent(this);
201 context
.location
= getCurrentLocation();
203 } catch (IOException e
) {
209 private synchronized void notifyListeners() {
210 TmfSignalManager
.dispatchSignal(new TmfStreamUpdateSignal(this, this));
214 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
216 public void indexStream(boolean waitForCompletion
) {
217 IndexingJob job
= new IndexingJob(fName
);
219 if (waitForCompletion
) {
222 } catch (InterruptedException e
) {
223 // TODO Auto-generated catch block
229 private class IndexingJob
extends Job
{
231 public IndexingJob(String name
) {
236 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
239 protected IStatus
run(IProgressMonitor monitor
) {
242 TmfTimestamp startTime
= new TmfTimestamp();
243 TmfTimestamp lastTime
= new TmfTimestamp();
245 monitor
.beginTask("Indexing " + fName
, IProgressMonitor
.UNKNOWN
);
248 StreamContext nextEventContext
;
250 nextEventContext
= seekLocation(null);
252 StreamContext currentEventContext
= new StreamContext(nextEventContext
.location
);
253 TmfEvent event
= getNextEvent(nextEventContext
);
255 startTime
= event
.getTimestamp();
256 lastTime
= event
.getTimestamp();
259 while (event
!= null) {
260 lastTime
= event
.getTimestamp();
261 if ((nbEvents
++ % fCacheSize
) == 0) {
262 TmfStreamCheckpoint bookmark
= new TmfStreamCheckpoint(lastTime
, currentEventContext
.location
);
264 fCheckpoints
.add(bookmark
);
265 fNbEvents
= nbEvents
;
266 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
270 // Check monitor *after* fCheckpoints has been updated
271 if (monitor
.isCanceled()) {
272 return Status
.CANCEL_STATUS
;
276 currentEventContext
.location
= nextEventContext
.location
;
277 event
= getNextEvent(nextEventContext
);
282 fNbEvents
= nbEvents
;
283 fTimeRange
= new TmfTimeRange(startTime
, lastTime
);
289 return Status
.OK_STATUS
;