Patch for Bug287563 (signal handling)
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / stream / TmfEventStream.java
1 /*******************************************************************************
2 * Copyright (c) 2009 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.stream;
14
15 import java.io.FileNotFoundException;
16 import java.io.IOException;
17 import java.util.Collections;
18 import java.util.Vector;
19
20 import org.eclipse.core.runtime.IProgressMonitor;
21 import org.eclipse.core.runtime.IStatus;
22 import org.eclipse.core.runtime.Status;
23 import org.eclipse.core.runtime.jobs.Job;
24 import org.eclipse.linuxtools.tmf.event.TmfEvent;
25 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
26 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
27 import org.eclipse.linuxtools.tmf.signal.TmfSignalManager;
28
29 /**
30 * <b><u>TmfEventStream</u></b>
31 * <p>
32 * TODO: Implement me. Please.
33 */
34 public abstract class TmfEventStream implements ITmfEventStream {
35
36 // ========================================================================
37 // Constants
38 // ========================================================================
39
40 // The default number of events to cache
41 public static final int DEFAULT_CACHE_SIZE = 1000;
42
43 // ========================================================================
44 // Attributes
45 // ========================================================================
46
47 // The stream name
48 private final String fName;
49
50 // The stream parser
51 private final ITmfEventParser fParser;
52
53 // The cache size
54 private final int fCacheSize;
55
56 // The set of event stream checkpoints (for random access)
57 private Vector<TmfStreamCheckpoint> fCheckpoints = new Vector<TmfStreamCheckpoint>();
58
59 // The number of events collected
60 private int fNbEvents = 0;
61
62 // The time span of the event stream
63 private TmfTimeRange fTimeRange = new TmfTimeRange(TmfTimestamp.BigBang, TmfTimestamp.BigBang);
64
65 // ========================================================================
66 // Constructors
67 // ========================================================================
68
69 /**
70 * @param filename
71 * @param parser
72 * @param cacheSize
73 * @throws FileNotFoundException
74 */
75 protected TmfEventStream(String filename, ITmfEventParser parser, int cacheSize) throws FileNotFoundException {
76 fName = filename;
77 fParser = parser;
78 fCacheSize = cacheSize;
79 }
80
81 /**
82 * @param filename
83 * @param parser
84 * @throws FileNotFoundException
85 */
86 protected TmfEventStream(String filename, ITmfEventParser parser) throws FileNotFoundException {
87 this(filename, parser, DEFAULT_CACHE_SIZE);
88 }
89
90 // ========================================================================
91 // Accessors
92 // ========================================================================
93
94 /**
95 * @return
96 */
97 public int getCacheSize() {
98 return fCacheSize;
99 }
100
101 /**
102 * @return
103 */
104 public String getName() {
105 return fName;
106 }
107
108 /**
109 * @return
110 */
111 public synchronized int getNbEvents() {
112 return fNbEvents;
113 }
114
115 /**
116 * @return
117 */
118 public synchronized TmfTimeRange getTimeRange() {
119 return fTimeRange;
120 }
121
122 // ========================================================================
123 // Operators
124 // ========================================================================
125
126 public StreamContext seekEvent(TmfTimestamp timestamp) {
127
128 // First, find the right checkpoint
129 int index = Collections.binarySearch(fCheckpoints, new TmfStreamCheckpoint(timestamp, 0));
130
131 // In the very likely event that the checkpoint was not found, bsearch
132 // returns its negated would-be location (not an offset...). From that
133 // index, we can then position the stream and get the event.
134 if (index < 0) {
135 index = Math.max(0, -(index + 2));
136 }
137
138 // Position the stream at the checkpoint
139 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
140 StreamContext nextEventContext;
141 synchronized(this) {
142 nextEventContext = seekLocation(location);
143 }
144 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
145
146 // And get the event
147 TmfEvent event = getNextEvent(nextEventContext);
148 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
149 currentEventContext.location = nextEventContext.location;
150 event = getNextEvent(nextEventContext);
151 }
152
153 return currentEventContext;
154 }
155
156 public StreamContext seekEvent(int position) {
157
158 // Position the stream at the previous checkpoint
159 int index = position / fCacheSize;
160 Object location = (index < fCheckpoints.size()) ? fCheckpoints.elementAt(index).getLocation() : null;
161 StreamContext nextEventContext;
162 synchronized(this) {
163 nextEventContext = seekLocation(location);
164 }
165 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
166
167 // And locate the event (if it exists)
168 int current = index * fCacheSize;
169 TmfEvent event = getNextEvent(nextEventContext);
170 while (event != null && current < position) {
171 currentEventContext.location = nextEventContext.location;
172 event = getNextEvent(nextEventContext);
173 current++;
174 }
175
176 return currentEventContext;
177 }
178
179 public TmfEvent getEvent(StreamContext context, TmfTimestamp timestamp) {
180
181 // Position the stream and update the context object
182 StreamContext ctx = seekEvent(timestamp);
183 context.location = ctx.location;
184
185 return getNextEvent(context);
186 }
187
188 public TmfEvent getEvent(StreamContext context, int position) {
189
190 // Position the stream and update the context object
191 StreamContext ctx = seekEvent(position);
192 context.location = ctx.location;
193
194 return getNextEvent(context);
195 }
196
197 public synchronized TmfEvent getNextEvent(StreamContext context) {
198 try {
199 seekLocation(context.location);
200 TmfEvent event = fParser.getNextEvent(this);
201 context.location = getCurrentLocation();
202 return event;
203 } catch (IOException e) {
204 e.printStackTrace();
205 }
206 return null;
207 }
208
209 private synchronized void notifyListeners() {
210 TmfSignalManager.dispatchSignal(new TmfStreamUpdateSignal(this, this));
211 }
212
213 /* (non-Javadoc)
214 * @see org.eclipse.linuxtools.tmf.eventlog.ITmfEventStream#indexStream()
215 */
216 public void indexStream(boolean waitForCompletion) {
217 IndexingJob job = new IndexingJob(fName);
218 job.schedule();
219 if (waitForCompletion) {
220 try {
221 job.join();
222 } catch (InterruptedException e) {
223 // TODO Auto-generated catch block
224 e.printStackTrace();
225 }
226 }
227 }
228
229 private class IndexingJob extends Job {
230
231 public IndexingJob(String name) {
232 super(name);
233 }
234
235 /* (non-Javadoc)
236 * @see org.eclipse.core.runtime.jobs.Job#run(org.eclipse.core.runtime.IProgressMonitor)
237 */
238 @Override
239 protected IStatus run(IProgressMonitor monitor) {
240
241 int nbEvents = 0;
242 TmfTimestamp startTime = new TmfTimestamp();
243 TmfTimestamp lastTime = new TmfTimestamp();
244
245 monitor.beginTask("Indexing " + fName, IProgressMonitor.UNKNOWN);
246
247 try {
248 StreamContext nextEventContext;
249 synchronized(this) {
250 nextEventContext = seekLocation(null);
251 }
252 StreamContext currentEventContext = new StreamContext(nextEventContext.location);
253 TmfEvent event = getNextEvent(nextEventContext);
254 if (event != null) {
255 startTime = event.getTimestamp();
256 lastTime = event.getTimestamp();
257 }
258
259 while (event != null) {
260 lastTime = event.getTimestamp();
261 if ((nbEvents++ % fCacheSize) == 0) {
262 TmfStreamCheckpoint bookmark = new TmfStreamCheckpoint(lastTime, currentEventContext.location);
263 synchronized(this) {
264 fCheckpoints.add(bookmark);
265 fNbEvents = nbEvents;
266 fTimeRange = new TmfTimeRange(startTime, lastTime);
267 }
268 notifyListeners();
269 monitor.worked(1);
270 // Check monitor *after* fCheckpoints has been updated
271 if (monitor.isCanceled()) {
272 return Status.CANCEL_STATUS;
273 }
274 }
275
276 currentEventContext.location = nextEventContext.location;
277 event = getNextEvent(nextEventContext);
278 }
279 }
280 finally {
281 synchronized(this) {
282 fNbEvents = nbEvents;
283 fTimeRange = new TmfTimeRange(startTime, lastTime);
284 }
285 notifyListeners();
286 monitor.done();
287 }
288
289 return Status.OK_STATUS;
290 }
291 }
292
293 }
This page took 0.036374 seconds and 5 git commands to generate.