Patch for Bug287563
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / stream / AbstractTmfEventStream.java
CommitLineData
165c977c
FC
1/*******************************************************************************
2 * Copyright (c) 2009 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.stream;
14
15import java.io.FileNotFoundException;
16import java.io.IOException;
17import java.util.Collections;
18import java.util.HashSet;
19import java.util.Set;
20import java.util.Vector;
21
22import org.eclipse.core.runtime.IProgressMonitor;
23import org.eclipse.core.runtime.IStatus;
24import org.eclipse.core.runtime.Status;
25import org.eclipse.core.runtime.jobs.Job;
26import org.eclipse.linuxtools.tmf.event.TmfEvent;
27import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
28import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
29import org.eclipse.linuxtools.tmf.trace.TmfTrace;
30
31/**
32 * <b><u>AbstractTmfEventStream</u></b>
33 * <p>
34 * TODO: Implement me. Please.
35 */
36public abstract class AbstractTmfEventStream implements ITmfEventStream {
37
38 // ========================================================================
39 // Constants
40 // ========================================================================
41
42 // The default number of events to cache
43 public static final int DEFAULT_CACHE_SIZE = 1000;
44
45 // ========================================================================
46 // Attributes
47 // ========================================================================
48
49 // The stream name
50 private final String fName;
51
52 // The stream parser
53 private final ITmfEventParser fParser;
54
55 // The cache size
56 private final int fCacheSize;
57
58 // The set of event stream checkpoints (for random access)
59 private Vector<TmfStreamCheckpoint> fCheckpoints = new Vector<TmfStreamCheckpoint>();
60
61 // The number of events collected
62 private int fNbEvents = 0;
63
64 // The time span of the event stream
65 private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang);
66
67 // The listeners
68 private Set<TmfTrace> fListeners = new HashSet<TmfTrace>();
69
70 // ========================================================================
71 // Constructors
72 // ========================================================================
73
74 /**
75 * @param filename
76 * @param parser
77 * @param cacheSize
78 * @throws FileNotFoundException
79 */
80 protected AbstractTmfEventStream(String filename, ITmfEventParser parser, int cacheSize) throws FileNotFoundException {
81 fName = filename;
82 fParser = parser;
83 fCacheSize = cacheSize;
84 }
85
86 /**
87 * @param filename
88 * @param parser
89 * @throws FileNotFoundException
90 */
91 protected AbstractTmfEventStream(String filename, ITmfEventParser parser) throws FileNotFoundException {
92 this(filename, parser, DEFAULT_CACHE_SIZE);
93 }
94
95 // ========================================================================
96 // Accessors
97 // ========================================================================
98
99 /**
100 * @return
101 */
102 public int getCacheSize() {
103 return fCacheSize;
104 }
105
106 /**
107 * @return
108 */
109 public String getName() {
110 return fName;
111 }
112
113 /**
114 * @return
115 */
116 public synchronized int getNbEvents() {
117 return fNbEvents;
118 }
119
120 /**
121 * @return
122 */
123 public synchronized TmfTimeRange getTimeRange() {
124 return fTimeRange;
125 }
126
127 // ========================================================================
128 // Operators
129 // ========================================================================
130
131 public StreamContext seekEvent(TmfTimestamp timestamp) {
132
133 // First, find the right checkpoint
134 int index = Collections.binarySearch(fCheckpoints, new TmfStreamCheckpoint(timestamp, 0));
135
136 // In the very likely event that the checkpoint was not found, bsearch
137 // returns its negated would-be location (not an offset...). From that
138 // index, we can then position the stream and get the event.
139 if (index < 0) {
140 index = Math.max(0, -(index + 2));
141 }
142
143 // Position the stream at the checkpoint
144 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
145 StreamContext nextEventContext;
146 synchronized(this) {
147 nextEventContext = seekLocation(location);
148 }
149 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
150
151 // And get the event
152 TmfEvent event = getNextEvent(nextEventContext);
153 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
154 currentEventContext.location = nextEventContext.location;
155 event = getNextEvent(nextEventContext);
156 }
157
158 return currentEventContext;
159 }
160
161 public StreamContext seekEvent(int position) {
162
163 // Position the stream at the previous checkpoint
164 int index = position / fCacheSize;
165 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
166 StreamContext nextEventContext;
167 synchronized(this) {
168 nextEventContext = seekLocation(location);
169 }
170 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
171
172 // And locate the event (if it exists)
173 int current = index * fCacheSize;
174 TmfEvent event = getNextEvent(nextEventContext);
175 while (event != null && current < position) {
176 currentEventContext.location = nextEventContext.location;
177 event = getNextEvent(nextEventContext);
178 current++;
179 }
180
181 return currentEventContext;
182 }
183
184 public TmfEvent getEvent(StreamContext context, TmfTimestamp timestamp) {
185
186 // Position the stream and update the context object
187 StreamContext ctx = seekEvent(timestamp);
188 context.location = ctx.location;
189
190 return getNextEvent(context);
191 }
192
193 public TmfEvent getEvent(StreamContext context, int position) {
194
195 // Position the stream and update the context object
196 StreamContext ctx = seekEvent(position);
197 context.location = ctx.location;
198
199 return getNextEvent(context);
200 }
201
202 public synchronized TmfEvent getNextEvent(StreamContext context) {
203 try {
204 seekLocation(context.location);
205 TmfEvent event = fParser.getNextEvent(this);
206 context.location = getCurrentLocation();
207 return event;
208 } catch (IOException e) {
209 e.printStackTrace();
210 }
211 return null;
212 }
213
214 public synchronized void addListener(TmfTrace listener) {
215 fListeners.add(listener);
216 }
217
218 public synchronized void removeListener(TmfTrace listener) {
219 fListeners.remove(listener);
220 }
221
222 private synchronized void notifyListeners() {
223 for (TmfTrace listener : fListeners) {
224 listener.handleEvent(new TmfStreamUpdateEvent(this));
225 }
226 }
227
228 /* (non-Javadoc)
229 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
230 */
231 public void indexStream(boolean waitForCompletion) {
232 IndexingJob job = new IndexingJob(fName);
233 job.schedule();
234 if (waitForCompletion) {
235 try {
236 job.join();
237 } catch (InterruptedException e) {
238 // TODO Auto-generated catch block
239 e.printStackTrace();
240 }
241 }
242 }
243
244 private class IndexingJob extends Job {
245
246 public IndexingJob(String name) {
247 super(name);
248 }
249
250 /* (non-Javadoc)
251 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
252 */
253 @Override
254 protected IStatus run(IProgressMonitor monitor) {
255
256 int nbEvents = 0;
257 TmfTimestamp startTime = new TmfTimestamp();
258 TmfTimestamp lastTime = new TmfTimestamp();
259
260 monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
261
262 try {
263 StreamContext nextEventContext;
264 synchronized(this) {
265 nextEventContext = seekLocation(null);
266 }
267 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
268 TmfEvent event = getNextEvent(nextEventContext);
269 if (event != null) {
270 startTime = event.getTimestamp();
271 lastTime = event.getTimestamp();
272 }
273
274 while (event != null) {
275 lastTime = event.getTimestamp();
276 if ((nbEvents++ % fCacheSize) == 0) {
277 TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(lastTime, currentEventContext.location);
278 synchronized(this) {
279 fCheckpoints.add(bookmark);
280 fNbEvents = nbEvents;
281 fTimeRange = new TmfTimeRange(startTime, lastTime);
282 }
283 notifyListeners();
284 monitor.worked(1);
285 // Check monitor *after* fCheckpoints has been updated
286 if (monitor.isCanceled()) {
287 return Status.CANCEL_STATUS;
288 }
289 }
290
291 currentEventContext.location = nextEventContext.location;
292 event = getNextEvent(nextEventContext);
293 }
294 }
295 finally {
296 synchronized(this) {
297 fNbEvents = nbEvents;
298 fTimeRange = new TmfTimeRange(startTime, lastTime);
299 }
300 notifyListeners();
301 monitor.done();
302 }
303
304 return Status.OK_STATUS;
305 }
306 }
307
308 public void indexStream(final String filename) {
309
310// ProgressMonitorDialog dialog = new ProgressMonitorDialog(null);
311// try {
312// dialog.run(true, true, new IRunnableWithProgress() {
313// @Override
314// public void run(IProgressMonitor monitor)
315// throws InvocationTargetException, InterruptedException {
316// monitor.beginTask("Indexing " + filename,
317// IProgressMonitor.UNKNOWN);
318//
319// try {
320// seekLocation(null);
321// TmfTimestamp startTime = new TmfTimestamp();
322// TmfTimestamp lastTime = new TmfTimestamp();
323// Object location = getCurrentLocation();
324//
325// TmfEvent event = getNextEvent();
326// if (event != null) {
327// startTime = event.getTimestamp();
328// while (event != null) {
329// lastTime = event.getTimestamp();
330// if ((fNbEvents++ % fCacheSize) == 0) {
331// if (monitor.isCanceled()) {
332// throw new CancellationException();
333// }
334// TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(
335// lastTime, location);
336// fCheckpoints.add(bookmark);
337// monitor.worked(1);
338// }
339// location = getCurrentLocation();
340// event = getNextEvent();
341// }
342// fTimeRange = new TmfTimeRange(startTime, lastTime);
343// }
344// seekLocation(null);
345// } catch (IOException e) {
346// } finally {
347// monitor.done();
348// }
349// }
350//
351// });
352// } catch (InvocationTargetException e1) {
353// // TODO Auto-generated catch block
354// e1.printStackTrace();
355// } catch (InterruptedException e1) {
356// // TODO Auto-generated catch block
357// e1.printStackTrace();
358// }
359 }
360
361}
This page took 0.038567 seconds and 5 git commands to generate.