Commit | Line | Data |
---|---|---|
165c977c FC |
1 | /******************************************************************************* |
2 | * Copyright (c) 2009 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Francois Chouinard - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.stream; | |
14 | ||
15 | import java.io.FileNotFoundException; | |
16 | import java.io.IOException; | |
17 | import java.util.Collections; | |
18 | import java.util.HashSet; | |
19 | import java.util.Set; | |
20 | import java.util.Vector; | |
21 | ||
22 | import org.eclipse.core.runtime.IProgressMonitor; | |
23 | import org.eclipse.core.runtime.IStatus; | |
24 | import org.eclipse.core.runtime.Status; | |
25 | import org.eclipse.core.runtime.jobs.Job; | |
26 | import org.eclipse.linuxtools.tmf.event.TmfEvent; | |
27 | import org.eclipse.linuxtools.tmf.event.TmfTimeRange; | |
28 | import org.eclipse.linuxtools.tmf.event.TmfTimestamp; | |
29 | import org.eclipse.linuxtools.tmf.trace.TmfTrace; | |
30 | ||
31 | /** | |
32 | * <b><u>AbstractTmfEventStream</u></b> | |
33 | * <p> | |
34 | * TODO: Implement me. Please. | |
35 | */ | |
36 | public abstract class AbstractTmfEventStream implements ITmfEventStream { | |
37 | ||
38 | // ======================================================================== | |
39 | // Constants | |
40 | // ======================================================================== | |
41 | ||
42 | // The default number of events to cache | |
43 | public static final int DEFAULT_CACHE_SIZE = 1000; | |
44 | ||
45 | // ======================================================================== | |
46 | // Attributes | |
47 | // ======================================================================== | |
48 | ||
49 | // The stream name | |
50 | private final String fName; | |
51 | ||
52 | // The stream parser | |
53 | private final ITmfEventParser fParser; | |
54 | ||
55 | // The cache size | |
56 | private final int fCacheSize; | |
57 | ||
58 | // The set of event stream checkpoints (for random access) | |
59 | private Vector<TmfStreamCheckpoint> fCheckpoints = new Vector<TmfStreamCheckpoint>(); | |
60 | ||
61 | // The number of events collected | |
62 | private int fNbEvents = 0; | |
63 | ||
64 | // The time span of the event stream | |
65 | private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang); | |
66 | ||
67 | // The listeners | |
68 | private Set<TmfTrace> fListeners = new HashSet<TmfTrace>(); | |
69 | ||
70 | // ======================================================================== | |
71 | // Constructors | |
72 | // ======================================================================== | |
73 | ||
74 | /** | |
75 | * @param filename | |
76 | * @param parser | |
77 | * @param cacheSize | |
78 | * @throws FileNotFoundException | |
79 | */ | |
80 | protected AbstractTmfEventStream(String filename, ITmfEventParser parser, int cacheSize) throws FileNotFoundException { | |
81 | fName = filename; | |
82 | fParser = parser; | |
83 | fCacheSize = cacheSize; | |
84 | } | |
85 | ||
86 | /** | |
87 | * @param filename | |
88 | * @param parser | |
89 | * @throws FileNotFoundException | |
90 | */ | |
91 | protected AbstractTmfEventStream(String filename, ITmfEventParser parser) throws FileNotFoundException { | |
92 | this(filename, parser, DEFAULT_CACHE_SIZE); | |
93 | } | |
94 | ||
95 | // ======================================================================== | |
96 | // Accessors | |
97 | // ======================================================================== | |
98 | ||
99 | /** | |
100 | * @return | |
101 | */ | |
102 | public int getCacheSize() { | |
103 | return fCacheSize; | |
104 | } | |
105 | ||
106 | /** | |
107 | * @return | |
108 | */ | |
109 | public String getName() { | |
110 | return fName; | |
111 | } | |
112 | ||
113 | /** | |
114 | * @return | |
115 | */ | |
116 | public synchronized int getNbEvents() { | |
117 | return fNbEvents; | |
118 | } | |
119 | ||
120 | /** | |
121 | * @return | |
122 | */ | |
123 | public synchronized TmfTimeRange getTimeRange() { | |
124 | return fTimeRange; | |
125 | } | |
126 | ||
127 | // ======================================================================== | |
128 | // Operators | |
129 | // ======================================================================== | |
130 | ||
131 | public StreamContext seekEvent(TmfTimestamp timestamp) { | |
132 | ||
133 | // First, find the right checkpoint | |
134 | int index = Collections.binarySearch(fCheckpoints, new TmfStreamCheckpoint(timestamp, 0)); | |
135 | ||
136 | // In the very likely event that the checkpoint was not found, bsearch | |
137 | // returns its negated would-be location (not an offset...). From that | |
138 | // index, we can then position the stream and get the event. | |
139 | if (index < 0) { | |
140 | index = Math.max(0, -(index + 2)); | |
141 | } | |
142 | ||
143 | // Position the stream at the checkpoint | |
144 | Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null; | |
145 | StreamContext nextEventContext; | |
146 | synchronized(this) { | |
147 | nextEventContext = seekLocation(location); | |
148 | } | |
149 | StreamContext currentEventContext = new StreamContext(nextEventContext.location); | |
150 | ||
151 | // And get the event | |
152 | TmfEvent event = getNextEvent(nextEventContext); | |
153 | while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) { | |
154 | currentEventContext.location = nextEventContext.location; | |
155 | event = getNextEvent(nextEventContext); | |
156 | } | |
157 | ||
158 | return currentEventContext; | |
159 | } | |
160 | ||
161 | public StreamContext seekEvent(int position) { | |
162 | ||
163 | // Position the stream at the previous checkpoint | |
164 | int index = position / fCacheSize; | |
165 | Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null; | |
166 | StreamContext nextEventContext; | |
167 | synchronized(this) { | |
168 | nextEventContext = seekLocation(location); | |
169 | } | |
170 | StreamContext currentEventContext = new StreamContext(nextEventContext.location); | |
171 | ||
172 | // And locate the event (if it exists) | |
173 | int current = index * fCacheSize; | |
174 | TmfEvent event = getNextEvent(nextEventContext); | |
175 | while (event != null && current < position) { | |
176 | currentEventContext.location = nextEventContext.location; | |
177 | event = getNextEvent(nextEventContext); | |
178 | current++; | |
179 | } | |
180 | ||
181 | return currentEventContext; | |
182 | } | |
183 | ||
184 | public TmfEvent getEvent(StreamContext context, TmfTimestamp timestamp) { | |
185 | ||
186 | // Position the stream and update the context object | |
187 | StreamContext ctx = seekEvent(timestamp); | |
188 | context.location = ctx.location; | |
189 | ||
190 | return getNextEvent(context); | |
191 | } | |
192 | ||
193 | public TmfEvent getEvent(StreamContext context, int position) { | |
194 | ||
195 | // Position the stream and update the context object | |
196 | StreamContext ctx = seekEvent(position); | |
197 | context.location = ctx.location; | |
198 | ||
199 | return getNextEvent(context); | |
200 | } | |
201 | ||
202 | public synchronized TmfEvent getNextEvent(StreamContext context) { | |
203 | try { | |
204 | seekLocation(context.location); | |
205 | TmfEvent event = fParser.getNextEvent(this); | |
206 | context.location = getCurrentLocation(); | |
207 | return event; | |
208 | } catch (IOException e) { | |
209 | e.printStackTrace(); | |
210 | } | |
211 | return null; | |
212 | } | |
213 | ||
214 | public synchronized void addListener(TmfTrace listener) { | |
215 | fListeners.add(listener); | |
216 | } | |
217 | ||
218 | public synchronized void removeListener(TmfTrace listener) { | |
219 | fListeners.remove(listener); | |
220 | } | |
221 | ||
222 | private synchronized void notifyListeners() { | |
223 | for (TmfTrace listener : fListeners) { | |
224 | listener.handleEvent(new TmfStreamUpdateEvent(this)); | |
225 | } | |
226 | } | |
227 | ||
228 | /* (non-Javadoc) | |
229 | * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream() | |
230 | */ | |
231 | public void indexStream(boolean waitForCompletion) { | |
232 | IndexingJob job = new IndexingJob(fName); | |
233 | job.schedule(); | |
234 | if (waitForCompletion) { | |
235 | try { | |
236 | job.join(); | |
237 | } catch (InterruptedException e) { | |
238 | // TODO Auto-generated catch block | |
239 | e.printStackTrace(); | |
240 | } | |
241 | } | |
242 | } | |
243 | ||
244 | private class IndexingJob extends Job { | |
245 | ||
246 | public IndexingJob(String name) { | |
247 | super(name); | |
248 | } | |
249 | ||
250 | /* (non-Javadoc) | |
251 | * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor) | |
252 | */ | |
253 | @Override | |
254 | protected IStatus run(IProgressMonitor monitor) { | |
255 | ||
256 | int nbEvents = 0; | |
257 | TmfTimestamp startTime = new TmfTimestamp(); | |
258 | TmfTimestamp lastTime = new TmfTimestamp(); | |
259 | ||
260 | monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN); | |
261 | ||
262 | try { | |
263 | StreamContext nextEventContext; | |
264 | synchronized(this) { | |
265 | nextEventContext = seekLocation(null); | |
266 | } | |
267 | StreamContext currentEventContext = new StreamContext(nextEventContext.location); | |
268 | TmfEvent event = getNextEvent(nextEventContext); | |
269 | if (event != null) { | |
270 | startTime = event.getTimestamp(); | |
271 | lastTime = event.getTimestamp(); | |
272 | } | |
273 | ||
274 | while (event != null) { | |
275 | lastTime = event.getTimestamp(); | |
276 | if ((nbEvents++ % fCacheSize) == 0) { | |
277 | TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(lastTime, currentEventContext.location); | |
278 | synchronized(this) { | |
279 | fCheckpoints.add(bookmark); | |
280 | fNbEvents = nbEvents; | |
281 | fTimeRange = new TmfTimeRange(startTime, lastTime); | |
282 | } | |
283 | notifyListeners(); | |
284 | monitor.worked(1); | |
285 | // Check monitor *after* fCheckpoints has been updated | |
286 | if (monitor.isCanceled()) { | |
287 | return Status.CANCEL_STATUS; | |
288 | } | |
289 | } | |
290 | ||
291 | currentEventContext.location = nextEventContext.location; | |
292 | event = getNextEvent(nextEventContext); | |
293 | } | |
294 | } | |
295 | finally { | |
296 | synchronized(this) { | |
297 | fNbEvents = nbEvents; | |
298 | fTimeRange = new TmfTimeRange(startTime, lastTime); | |
299 | } | |
300 | notifyListeners(); | |
301 | monitor.done(); | |
302 | } | |
303 | ||
304 | return Status.OK_STATUS; | |
305 | } | |
306 | } | |
307 | ||
308 | public void indexStream(final String filename) { | |
309 | ||
310 | // ProgressMonitorDialog dialog = new ProgressMonitorDialog(null); | |
311 | // try { | |
312 | // dialog.run(true, true, new IRunnableWithProgress() { | |
313 | // @Override | |
314 | // public void run(IProgressMonitor monitor) | |
315 | // throws InvocationTargetException, InterruptedException { | |
316 | // monitor.beginTask("Indexing " + filename, | |
317 | // IProgressMonitor.UNKNOWN); | |
318 | // | |
319 | // try { | |
320 | // seekLocation(null); | |
321 | // TmfTimestamp startTime = new TmfTimestamp(); | |
322 | // TmfTimestamp lastTime = new TmfTimestamp(); | |
323 | // Object location = getCurrentLocation(); | |
324 | // | |
325 | // TmfEvent event = getNextEvent(); | |
326 | // if (event != null) { | |
327 | // startTime = event.getTimestamp(); | |
328 | // while (event != null) { | |
329 | // lastTime = event.getTimestamp(); | |
330 | // if ((fNbEvents++ % fCacheSize) == 0) { | |
331 | // if (monitor.isCanceled()) { | |
332 | // throw new CancellationException(); | |
333 | // } | |
334 | // TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint( | |
335 | // lastTime, location); | |
336 | // fCheckpoints.add(bookmark); | |
337 | // monitor.worked(1); | |
338 | // } | |
339 | // location = getCurrentLocation(); | |
340 | // event = getNextEvent(); | |
341 | // } | |
342 | // fTimeRange = new TmfTimeRange(startTime, lastTime); | |
343 | // } | |
344 | // seekLocation(null); | |
345 | // } catch (IOException e) { | |
346 | // } finally { | |
347 | // monitor.done(); | |
348 | // } | |
349 | // } | |
350 | // | |
351 | // }); | |
352 | // } catch (InvocationTargetException e1) { | |
353 | // // TODO Auto-generated catch block | |
354 | // e1.printStackTrace(); | |
355 | // } catch (InterruptedException e1) { | |
356 | // // TODO Auto-generated catch block | |
357 | // e1.printStackTrace(); | |
358 | // } | |
359 | } | |
360 | ||
361 | } |