Fixed a concurrency issue with signal dispatching
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / stream / TmfEventStream.java
1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.stream;
14
15 import java.io.FileNotFoundException;
16 import java.io.IOException;
17 import java.util.Collections;
18 import java.util.Vector;
19
20 import org.eclipse.core.runtime.IProgressMonitor;
21 import org.eclipse.core.runtime.IStatus;
22 import org.eclipse.core.runtime.Status;
23 import org.eclipse.core.runtime.jobs.Job;
24 import org.eclipse.linuxtools.tmf.event.TmfEvent;
25 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
26 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
27 import org.eclipse.linuxtools.tmf.signal.TmfSignalManager;
28
29 /**
30 * <b><u>TmfEventStream</u></b>
31 * <p>
32 * TODO: Implement me. Please.
33 */
34 public abstract class TmfEventStream implements ITmfEventStream {
35
36 // ========================================================================
37 // Constants
38 // ========================================================================
39
40 // The default number of events to cache
41 public static final int DEFAULT_CACHE_SIZE = 1000;
42
43 // ========================================================================
44 // Attributes
45 // ========================================================================
46
47 // The stream name
48 private final String fName;
49
50 // The stream parser
51 private final ITmfEventParser fParser;
52
53 // The cache size
54 private final int fCacheSize;
55
56 // The set of event stream checkpoints (for random access)
57 private Vector<TmfStreamCheckpoint> fCheckpoints = new Vector<TmfStreamCheckpoint>();
58
59 // The number of events collected
60 private int fNbEvents = 0;
61
62 // The time span of the event stream
63 private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang);
64
65 // ========================================================================
66 // Constructors
67 // ========================================================================
68
69 /**
70 * @param filename
71 * @param parser
72 * @param cacheSize
73 * @throws FileNotFoundException
74 */
75 protected TmfEventStream(String filename, ITmfEventParser parser, int cacheSize) throws FileNotFoundException {
76 fName = filename;
77 fParser = parser;
78 fCacheSize = cacheSize;
79 }
80
81 /**
82 * @param filename
83 * @param parser
84 * @throws FileNotFoundException
85 */
86 protected TmfEventStream(String filename, ITmfEventParser parser) throws FileNotFoundException {
87 this(filename, parser, DEFAULT_CACHE_SIZE);
88 }
89
90 // ========================================================================
91 // Accessors
92 // ========================================================================
93
94 /**
95 * @return
96 */
97 public int getCacheSize() {
98 return fCacheSize;
99 }
100
101 /**
102 * @return
103 */
104 public String getName() {
105 return fName;
106 }
107
108 /* (non-Javadoc)
109 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getNbEvents()
110 */
111 public int getNbEvents() {
112 return fNbEvents;
113 }
114
115 /* (non-Javadoc)
116 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getTimeRange()
117 */
118 public TmfTimeRange getTimeRange() {
119 return fTimeRange;
120 }
121
122 /* (non-Javadoc)
123 * @see org.eclipse.linuxtools.tmf.stream.ITmfEventStream#getIndex(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
124 */
125 public int getIndex(TmfTimestamp timestamp) {
126 StreamContext context = seekEvent(timestamp);
127 return context.index;
128 }
129
130 // ========================================================================
131 // Operators
132 // ========================================================================
133
134 public StreamContext seekEvent(TmfTimestamp timestamp) {
135
136 // First, find the right checkpoint
137 int index = Collections.binarySearch(fCheckpoints, new TmfStreamCheckpoint(timestamp, 0));
138
139 // In the very likely event that the checkpoint was not found, bsearch
140 // returns its negated would-be location (not an offset...). From that
141 // index, we can then position the stream and get the event.
142 if (index < 0) {
143 index = Math.max(0, -(index + 2));
144 }
145
146 // Position the stream at the checkpoint
147 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
148 StreamContext nextEventContext;
149 synchronized(this) {
150 nextEventContext = seekLocation(location);
151 }
152 StreamContext currentEventContext = new StreamContext(nextEventContext.location, index * fCacheSize);
153
154 // And get the event
155 TmfEvent event = getNextEvent(nextEventContext);
156 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
157 currentEventContext.location = nextEventContext.location;
158 currentEventContext.index++;
159 event = getNextEvent(nextEventContext);
160 }
161
162 return currentEventContext;
163 }
164
165 public StreamContext seekEvent(int position) {
166
167 // Position the stream at the previous checkpoint
168 int index = position / fCacheSize;
169 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
170 StreamContext nextEventContext;
171 synchronized(this) {
172 nextEventContext = seekLocation(location);
173 }
174 StreamContext currentEventContext = new StreamContext(nextEventContext);
175
176 // And locate the event (if it exists)
177 int current = index * fCacheSize;
178 TmfEvent event = getNextEvent(nextEventContext);
179 while (event != null && current < position) {
180 currentEventContext.location = nextEventContext.location;
181 event = getNextEvent(nextEventContext);
182 current++;
183 }
184
185 return currentEventContext;
186 }
187
188 public TmfEvent getEvent(StreamContext context, TmfTimestamp timestamp) {
189
190 // Position the stream and update the context object
191 StreamContext ctx = seekEvent(timestamp);
192 context.location = ctx.location;
193
194 return getNextEvent(context);
195 }
196
197 public TmfEvent getEvent(StreamContext context, int position) {
198
199 // Position the stream and update the context object
200 StreamContext ctx = seekEvent(position);
201 context.location = ctx.location;
202
203 return getNextEvent(context);
204 }
205
206 public synchronized TmfEvent getNextEvent(StreamContext context) {
207 try {
208 seekLocation(context.location);
209 TmfEvent event = fParser.getNextEvent(this);
210 context.location = getCurrentLocation();
211 return event;
212 } catch (IOException e) {
213 e.printStackTrace();
214 }
215 return null;
216 }
217
218 private void notifyListeners() {
219 TmfSignalManager.dispatchSignal(new TmfStreamUpdatedSignal(this, this));
220 }
221
222 /* (non-Javadoc)
223 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
224 */
225 public void indexStream(boolean waitForCompletion) {
226 IndexingJob job = new IndexingJob(fName);
227 job.schedule();
228 if (waitForCompletion) {
229 try {
230 job.join();
231 } catch (InterruptedException e) {
232 // TODO Auto-generated catch block
233 e.printStackTrace();
234 }
235 }
236 }
237
238 private class IndexingJob extends Job {
239
240 public IndexingJob(String name) {
241 super(name);
242 }
243
244 /* (non-Javadoc)
245 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
246 */
247 @Override
248 protected IStatus run(IProgressMonitor monitor) {
249
250 int nbEvents = 0;
251 TmfTimestamp startTime = new TmfTimestamp();
252 TmfTimestamp lastTime = new TmfTimestamp();
253
254 monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
255
256 try {
257 StreamContext nextEventContext;
258 synchronized(this) {
259 nextEventContext = seekLocation(null);
260 }
261 StreamContext currentEventContext = new StreamContext(nextEventContext);
262 TmfEvent event = getNextEvent(nextEventContext);
263 if (event != null) {
264 startTime = event.getTimestamp();
265 lastTime = event.getTimestamp();
266 }
267
268 while (event != null) {
269 lastTime = event.getTimestamp();
270 if ((nbEvents++ % fCacheSize) == 0) {
271 TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(lastTime, currentEventContext.location);
272 synchronized(this) {
273 fCheckpoints.add(bookmark);
274 fNbEvents = nbEvents;
275 fTimeRange = new TmfTimeRange(startTime, lastTime);
276 }
277 notifyListeners();
278 monitor.worked(1);
279 // Check monitor *after* fCheckpoints has been updated
280 if (monitor.isCanceled()) {
281 return Status.CANCEL_STATUS;
282 }
283 }
284
285 currentEventContext.location = nextEventContext.location;
286 event = getNextEvent(nextEventContext);
287 }
288 }
289 finally {
290 synchronized(this) {
291 fNbEvents = nbEvents;
292 fTimeRange = new TmfTimeRange(startTime, lastTime);
293 }
294 notifyListeners();
295 monitor.done();
296 }
297
298 return Status.OK_STATUS;
299 }
300 }
301
302 }
This page took 0.039638 seconds and 5 git commands to generate.