Patch for Bug287563
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / stream / AbstractTmfEventStream.java
1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.stream;
14
15 import java.io.FileNotFoundException;
16 import java.io.IOException;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.Set;
20 import java.util.Vector;
21
22 import org.eclipse.core.runtime.IProgressMonitor;
23 import org.eclipse.core.runtime.IStatus;
24 import org.eclipse.core.runtime.Status;
25 import org.eclipse.core.runtime.jobs.Job;
26 import org.eclipse.linuxtools.tmf.event.TmfEvent;
27 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
28 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
29 import org.eclipse.linuxtools.tmf.trace.TmfTrace;
30
31 /**
32 * <b><u>AbstractTmfEventStream</u></b>
33 * <p>
34 * TODO: Implement me. Please.
35 */
36 public abstract class AbstractTmfEventStream implements ITmfEventStream {
37
38 // ========================================================================
39 // Constants
40 // ========================================================================
41
42 // The default number of events to cache
43 public static final int DEFAULT_CACHE_SIZE = 1000;
44
45 // ========================================================================
46 // Attributes
47 // ========================================================================
48
49 // The stream name
50 private final String fName;
51
52 // The stream parser
53 private final ITmfEventParser fParser;
54
55 // The cache size
56 private final int fCacheSize;
57
58 // The set of event stream checkpoints (for random access)
59 private Vector<TmfStreamCheckpoint> fCheckpoints = new Vector<TmfStreamCheckpoint>();
60
61 // The number of events collected
62 private int fNbEvents = 0;
63
64 // The time span of the event stream
65 private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang);
66
67 // The listeners
68 private Set<TmfTrace> fListeners = new HashSet<TmfTrace>();
69
70 // ========================================================================
71 // Constructors
72 // ========================================================================
73
74 /**
75 * @param filename
76 * @param parser
77 * @param cacheSize
78 * @throws FileNotFoundException
79 */
80 protected AbstractTmfEventStream(String filename, ITmfEventParser parser, int cacheSize) throws FileNotFoundException {
81 fName = filename;
82 fParser = parser;
83 fCacheSize = cacheSize;
84 }
85
86 /**
87 * @param filename
88 * @param parser
89 * @throws FileNotFoundException
90 */
91 protected AbstractTmfEventStream(String filename, ITmfEventParser parser) throws FileNotFoundException {
92 this(filename, parser, DEFAULT_CACHE_SIZE);
93 }
94
95 // ========================================================================
96 // Accessors
97 // ========================================================================
98
99 /**
100 * @return
101 */
102 public int getCacheSize() {
103 return fCacheSize;
104 }
105
106 /**
107 * @return
108 */
109 public String getName() {
110 return fName;
111 }
112
113 /**
114 * @return
115 */
116 public synchronized int getNbEvents() {
117 return fNbEvents;
118 }
119
120 /**
121 * @return
122 */
123 public synchronized TmfTimeRange getTimeRange() {
124 return fTimeRange;
125 }
126
127 // ========================================================================
128 // Operators
129 // ========================================================================
130
131 public StreamContext seekEvent(TmfTimestamp timestamp) {
132
133 // First, find the right checkpoint
134 int index = Collections.binarySearch(fCheckpoints, new TmfStreamCheckpoint(timestamp, 0));
135
136 // In the very likely event that the checkpoint was not found, bsearch
137 // returns its negated would-be location (not an offset...). From that
138 // index, we can then position the stream and get the event.
139 if (index < 0) {
140 index = Math.max(0, -(index + 2));
141 }
142
143 // Position the stream at the checkpoint
144 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
145 StreamContext nextEventContext;
146 synchronized(this) {
147 nextEventContext = seekLocation(location);
148 }
149 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
150
151 // And get the event
152 TmfEvent event = getNextEvent(nextEventContext);
153 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
154 currentEventContext.location = nextEventContext.location;
155 event = getNextEvent(nextEventContext);
156 }
157
158 return currentEventContext;
159 }
160
161 public StreamContext seekEvent(int position) {
162
163 // Position the stream at the previous checkpoint
164 int index = position / fCacheSize;
165 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
166 StreamContext nextEventContext;
167 synchronized(this) {
168 nextEventContext = seekLocation(location);
169 }
170 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
171
172 // And locate the event (if it exists)
173 int current = index * fCacheSize;
174 TmfEvent event = getNextEvent(nextEventContext);
175 while (event != null && current < position) {
176 currentEventContext.location = nextEventContext.location;
177 event = getNextEvent(nextEventContext);
178 current++;
179 }
180
181 return currentEventContext;
182 }
183
184 public TmfEvent getEvent(StreamContext context, TmfTimestamp timestamp) {
185
186 // Position the stream and update the context object
187 StreamContext ctx = seekEvent(timestamp);
188 context.location = ctx.location;
189
190 return getNextEvent(context);
191 }
192
193 public TmfEvent getEvent(StreamContext context, int position) {
194
195 // Position the stream and update the context object
196 StreamContext ctx = seekEvent(position);
197 context.location = ctx.location;
198
199 return getNextEvent(context);
200 }
201
202 public synchronized TmfEvent getNextEvent(StreamContext context) {
203 try {
204 seekLocation(context.location);
205 TmfEvent event = fParser.getNextEvent(this);
206 context.location = getCurrentLocation();
207 return event;
208 } catch (IOException e) {
209 e.printStackTrace();
210 }
211 return null;
212 }
213
214 public synchronized void addListener(TmfTrace listener) {
215 fListeners.add(listener);
216 }
217
218 public synchronized void removeListener(TmfTrace listener) {
219 fListeners.remove(listener);
220 }
221
222 private synchronized void notifyListeners() {
223 for (TmfTrace listener : fListeners) {
224 listener.handleEvent(new TmfStreamUpdateEvent(this));
225 }
226 }
227
228 /* (non-Javadoc)
229 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
230 */
231 public void indexStream(boolean waitForCompletion) {
232 IndexingJob job = new IndexingJob(fName);
233 job.schedule();
234 if (waitForCompletion) {
235 try {
236 job.join();
237 } catch (InterruptedException e) {
238 // TODO Auto-generated catch block
239 e.printStackTrace();
240 }
241 }
242 }
243
244 private class IndexingJob extends Job {
245
246 public IndexingJob(String name) {
247 super(name);
248 }
249
250 /* (non-Javadoc)
251 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
252 */
253 @Override
254 protected IStatus run(IProgressMonitor monitor) {
255
256 int nbEvents = 0;
257 TmfTimestamp startTime = new TmfTimestamp();
258 TmfTimestamp lastTime = new TmfTimestamp();
259
260 monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
261
262 try {
263 StreamContext nextEventContext;
264 synchronized(this) {
265 nextEventContext = seekLocation(null);
266 }
267 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
268 TmfEvent event = getNextEvent(nextEventContext);
269 if (event != null) {
270 startTime = event.getTimestamp();
271 lastTime = event.getTimestamp();
272 }
273
274 while (event != null) {
275 lastTime = event.getTimestamp();
276 if ((nbEvents++ % fCacheSize) == 0) {
277 TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(lastTime, currentEventContext.location);
278 synchronized(this) {
279 fCheckpoints.add(bookmark);
280 fNbEvents = nbEvents;
281 fTimeRange = new TmfTimeRange(startTime, lastTime);
282 }
283 notifyListeners();
284 monitor.worked(1);
285 // Check monitor *after* fCheckpoints has been updated
286 if (monitor.isCanceled()) {
287 return Status.CANCEL_STATUS;
288 }
289 }
290
291 currentEventContext.location = nextEventContext.location;
292 event = getNextEvent(nextEventContext);
293 }
294 }
295 finally {
296 synchronized(this) {
297 fNbEvents = nbEvents;
298 fTimeRange = new TmfTimeRange(startTime, lastTime);
299 }
300 notifyListeners();
301 monitor.done();
302 }
303
304 return Status.OK_STATUS;
305 }
306 }
307
308 public void indexStream(final String filename) {
309
310 // ProgressMonitorDialog dialog = new ProgressMonitorDialog(null);
311 // try {
312 // dialog.run(true, true, new IRunnableWithProgress() {
313 // @Override
314 // public void run(IProgressMonitor monitor)
315 // throws InvocationTargetException, InterruptedException {
316 // monitor.beginTask("Indexing " + filename,
317 // IProgressMonitor.UNKNOWN);
318 //
319 // try {
320 // seekLocation(null);
321 // TmfTimestamp startTime = new TmfTimestamp();
322 // TmfTimestamp lastTime = new TmfTimestamp();
323 // Object location = getCurrentLocation();
324 //
325 // TmfEvent event = getNextEvent();
326 // if (event != null) {
327 // startTime = event.getTimestamp();
328 // while (event != null) {
329 // lastTime = event.getTimestamp();
330 // if ((fNbEvents++ % fCacheSize) == 0) {
331 // if (monitor.isCanceled()) {
332 // throw new CancellationException();
333 // }
334 // TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(
335 // lastTime, location);
336 // fCheckpoints.add(bookmark);
337 // monitor.worked(1);
338 // }
339 // location = getCurrentLocation();
340 // event = getNextEvent();
341 // }
342 // fTimeRange = new TmfTimeRange(startTime, lastTime);
343 // }
344 // seekLocation(null);
345 // } catch (IOException e) {
346 // } finally {
347 // monitor.done();
348 // }
349 // }
350 //
351 // });
352 // } catch (InvocationTargetException e1) {
353 // // TODO Auto-generated catch block
354 // e1.printStackTrace();
355 // } catch (InterruptedException e1) {
356 // // TODO Auto-generated catch block
357 // e1.printStackTrace();
358 // }
359 }
360
361 }
This page took 0.040309 seconds and 5 git commands to generate.