Add Iterator support to TMF traces
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f 1/*******************************************************************************
8584dc20 2 * Copyright (c) 2009, 2010, 2012 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f
ASL
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
8584dc20
FC
11 * Francois Chouinard - Replace background requests by pre-emptible requests
12 * Francois Chouinard - Rebased on TmfCoalescedRequest:s
8c8bf09f
ASL
13 *******************************************************************************/
14
6c13869b 15package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 16
8c8bf09f
ASL
17import java.util.Vector;
18import java.util.concurrent.BlockingQueue;
19import java.util.concurrent.LinkedBlockingQueue;
9b635e61 20import java.util.concurrent.SynchronousQueue;
8c8bf09f 21
5500a7f0 22import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
96b353c5 23import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
8fd82db5 24import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
8584dc20 25import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedRequest;
8fd82db5 26import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 27import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
8584dc20
FC
28import org.eclipse.linuxtools.tmf.core.request.ITmfRequest;
29import org.eclipse.linuxtools.tmf.core.request.ITmfRequest.TmfRequestPriority;
6c13869b
FC
30import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
31import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
32import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
33import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
34
35/**
8584dc20 36 * An abstract base class that implements ITmfEventProvider.
8c8bf09f 37 * <p>
8fd82db5
FC
38 * This abstract class implements the housekeeping methods to register/
39 * de-register the event provider and to handle generically the event requests.
8c8bf09f 40 * <p>
8fd82db5
FC
41 * The concrete class can either re-implement processRequest() entirely or just
42 * implement the hooks (initializeContext() and getNext()).
8c8bf09f 43 * <p>
0283f7ff 44 *
8fd82db5 45 * @author Francois Chouinard
96b353c5 46 * @version 1.1
8c8bf09f 47 */
8584dc20 48public abstract class TmfDataProvider extends TmfComponent implements ITmfEventProvider {
8c8bf09f 49
948b0607
FC
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
550d787e 53
063f0d27 54 /** Default amount of events per request "chunk" */
00641a97 55 public static final int DEFAULT_BLOCK_SIZE = 50000;
063f0d27
AM
56
57 /** Default size of the queue */
00641a97
FC
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
948b0607 60 // ------------------------------------------------------------------------
12c155f5 61 // Attributes
948b0607 62 // ------------------------------------------------------------------------
8c8bf09f 63
6f4e8ec0 64 /** The type of event handled by this provider */
6256d8ad 65 protected Class<? extends ITmfEvent> fType;
6f4e8ec0
AM
66
67 /** Is there some data being logged? */
12c155f5 68 protected boolean fLogData;
6f4e8ec0
AM
69
70 /** Are errors being logged? */
12c155f5 71 protected boolean fLogError;
f6b14ce2 72
6f4e8ec0 73 /** Queue of events */
6256d8ad 74 protected BlockingQueue<ITmfEvent> fDataQueue;
6f4e8ec0
AM
75
76 /** Size of the fDataQueue */
77 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
78
fc7cd0be 79 private TmfRequestExecutor fExecutor;
948b0607
FC
80
81 private int fSignalDepth = 0;
045df77d 82 private final Object fLock = new Object();
951d134a 83
c1c69938
FC
84 private int fRequestPendingCounter = 0;
85
8584dc20
FC
86 /** List of coalesced requests */
87 protected Vector<TmfCoalescedRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedRequest>();
88
948b0607
FC
89 // ------------------------------------------------------------------------
90 // Constructors
91 // ------------------------------------------------------------------------
92
063f0d27
AM
93 /**
94 * Default constructor
95 */
12c155f5 96 public TmfDataProvider() {
00641a97 97 super();
12c155f5 98 fQueueSize = DEFAULT_QUEUE_SIZE;
6256d8ad 99 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
12c155f5
FC
100 fExecutor = new TmfRequestExecutor();
101 }
102
063f0d27
AM
103 /**
104 * Initialize this data provider
105 *
106 * @param name
107 * Name of the provider
108 * @param type
109 * The type of events that will be handled
110 */
6256d8ad 111 public void init(String name, Class<? extends ITmfEvent> type) {
12c155f5 112 super.init(name);
3791b5df 113 fType = type;
6256d8ad 114 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
12c155f5
FC
115
116 fExecutor = new TmfRequestExecutor();
117 fSignalDepth = 0;
118
5500a7f0 119 fLogData = TmfCoreTracer.isEventTraced();
12c155f5
FC
120
121 TmfProviderManager.register(fType, this);
122 }
123
6f4e8ec0
AM
124 /**
125 * Constructor specifying the event type and the queue size.
126 *
127 * @param name
128 * Name of the provider
129 * @param type
130 * Type of event that will be handled
131 * @param queueSize
132 * Size of the event queue
133 */
6256d8ad 134 protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
00641a97 135 this();
948b0607 136 fQueueSize = queueSize;
00641a97 137 init(name, type);
948b0607
FC
138 }
139
063f0d27
AM
140 /**
141 * Copy constructor
142 *
143 * @param other
144 * The other object to copy
145 */
6256d8ad 146 public TmfDataProvider(TmfDataProvider other) {
00641a97
FC
147 this();
148 init(other.getName(), other.fType);
149 }
550d787e 150
063f0d27
AM
151 /**
152 * Standard constructor. Instantiate and initialize at the same time.
153 *
154 * @param name
155 * Name of the provider
156 * @param type
157 * The type of events that will be handled
158 */
6256d8ad 159 public TmfDataProvider(String name, Class<? extends ITmfEvent> type) {
00641a97 160 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
161 }
162
163 @Override
164 public void dispose() {
165 TmfProviderManager.deregister(fType, this);
166 fExecutor.stop();
167 super.dispose();
948b0607
FC
168 }
169
00641a97
FC
170 // ------------------------------------------------------------------------
171 // Accessors
172 // ------------------------------------------------------------------------
173
063f0d27
AM
174 /**
175 * Get the queue size of this provider
176 *
177 * @return The size of the queue
178 */
948b0607
FC
179 public int getQueueSize() {
180 return fQueueSize;
181 }
182
063f0d27
AM
183 /**
184 * Get the event type this provider handles
185 *
186 * @return The type of ITmfEvent
187 */
948b0607
FC
188 public Class<?> getType() {
189 return fType;
190 }
191
192 // ------------------------------------------------------------------------
193 // ITmfRequestHandler
194 // ------------------------------------------------------------------------
195
8584dc20
FC
196 /* (non-Javadoc)
197 * @see org.eclipse.linuxtools.tmf.core.component.ITmfEventProvider#sendRequest(org.eclipse.linuxtools.tmf.core.request.ITmfRequest)
198 */
199 /**
200 * @since 2.0
201 */
948b0607 202 @Override
8584dc20 203 public void sendRequest(final ITmfRequest request) {
948b0607
FC
204 synchronized (fLock) {
205 if (fSignalDepth > 0) {
206 coalesceDataRequest(request);
207 } else {
208 dispatchRequest(request);
209 }
210 }
211 }
212
8584dc20
FC
213 /* (non-Javadoc)
214 * @see org.eclipse.linuxtools.tmf.core.component.ITmfEventProvider#fireRequest()
215 */
b6cfa2bb
FC
216 /**
217 * @since 2.0
218 */
948b0607 219 @Override
c1c69938 220 public void fireRequest() {
948b0607
FC
221 synchronized (fLock) {
222 if (fRequestPendingCounter > 0) {
223 return;
224 }
225 if (fPendingCoalescedRequests.size() > 0) {
8584dc20 226 for (ITmfRequest request : fPendingCoalescedRequests) {
948b0607
FC
227 dispatchRequest(request);
228 }
229 fPendingCoalescedRequests.clear();
230 }
231 }
232 }
233
234 /**
063f0d27 235 * Increments/decrements the pending requests counters and fires the request
0283f7ff
FC
236 * if necessary (counter == 0). Used for coalescing requests across multiple
237 * TmfDataProvider's.
063f0d27 238 *
948b0607 239 * @param isIncrement
0283f7ff
FC
240 * Should we increment (true) or decrement (false) the pending
241 * counter
948b0607 242 */
c1c69938
FC
243 @Override
244 public void notifyPendingRequest(boolean isIncrement) {
948b0607 245 synchronized (fLock) {
c1c69938
FC
246 if (isIncrement) {
247 if (fSignalDepth > 0) {
248 fRequestPendingCounter++;
249 }
250 } else {
251 if (fRequestPendingCounter > 0) {
252 fRequestPendingCounter--;
253 }
948b0607 254
c1c69938
FC
255 // fire request if all pending requests are received
256 if (fRequestPendingCounter == 0) {
257 fireRequest();
258 }
259 }
260 }
948b0607
FC
261 }
262
263 // ------------------------------------------------------------------------
264 // Coalescing (primitive test...)
265 // ------------------------------------------------------------------------
266
6f4e8ec0
AM
267 /**
268 * Create a new request from an existing one, and add it to the coalesced
269 * requests
270 *
8584dc20
FC
271 * @param request The request to copy
272 * @since 2.0
6f4e8ec0 273 */
8584dc20 274 protected void newCoalescedDataRequest(ITmfRequest request) {
948b0607 275 synchronized (fLock) {
8584dc20 276 TmfCoalescedRequest coalescedRequest = new TmfCoalescedRequest(request);
5500a7f0
FC
277 if (TmfCoreTracer.isRequestTraced()) {
278 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
279 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
280 }
281 fPendingCoalescedRequests.add(coalescedRequest);
282 }
283 }
284
6f4e8ec0
AM
285 /**
286 * Add an existing requests to the list of coalesced ones
287 *
8584dc20
FC
288 * @param request The request to add to the list
289 * @since 2.0
6f4e8ec0 290 */
8584dc20 291 protected void coalesceDataRequest(ITmfRequest request) {
948b0607 292 synchronized (fLock) {
8584dc20 293 for (TmfCoalescedRequest coalescedRequest : fPendingCoalescedRequests) {
948b0607
FC
294 if (coalescedRequest.isCompatible(request)) {
295 coalescedRequest.addRequest(request);
5500a7f0
FC
296 if (TmfCoreTracer.isRequestTraced()) {
297 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
298 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
299 }
300 return;
301 }
302 }
303 newCoalescedDataRequest(request);
304 }
305 }
306
307 // ------------------------------------------------------------------------
308 // Request processing
309 // ------------------------------------------------------------------------
310
8584dc20
FC
311 private void dispatchRequest(final ITmfRequest request) {
312 if (request.getRequestPriority() == TmfRequestPriority.HIGH) {
948b0607 313 queueRequest(request);
0283f7ff 314 } else {
8584dc20 315 queueBackgroundRequest(request, true);
0283f7ff 316 }
948b0607
FC
317 }
318
6f4e8ec0
AM
319 /**
320 * Queue a request.
321 *
8584dc20
FC
322 * @param request The data request
323 * @since 2.0
6f4e8ec0 324 */
8584dc20 325 protected void queueRequest(final ITmfRequest request) {
948b0607
FC
326
327 if (fExecutor.isShutdown()) {
328 request.cancel();
329 return;
330 }
331
96b353c5 332 TmfEventThread thread = new TmfEventThread(this, request);
948b0607 333
5500a7f0
FC
334 if (TmfCoreTracer.isRequestTraced()) {
335 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
0283f7ff 336 }
96b353c5 337
948b0607 338 fExecutor.execute(thread);
948b0607
FC
339 }
340
6f4e8ec0
AM
341 /**
342 * Queue a background request
343 *
8584dc20
FC
344 * @param request The request
345 * @param indexing Should we index the chunks
b6cfa2bb 346<<<<<<< Upstream, based on master
8584dc20 347 *
b6cfa2bb
FC
348=======
349>>>>>>> f5b88da Refactor TmfRequest
8584dc20 350 * @since 2.0
6f4e8ec0 351 */
8584dc20 352 protected void queueBackgroundRequest(final ITmfRequest request, final boolean indexing) {
96b353c5 353 queueRequest(request);
948b0607
FC
354 }
355
356 /**
d337369a
FC
357 * Initialize the provider based on the request. The context is provider
358 * specific and will be updated by getNext().
0283f7ff 359 *
948b0607 360 * @param request
6f4e8ec0 361 * The request
80b237e6 362 * @return An application specific context; null if request can't be
6f4e8ec0 363 * serviced
80b237e6 364 * @since 2.0
948b0607 365 */
8584dc20 366 public abstract ITmfContext armRequest(ITmfRequest request);
948b0607 367
c32744d6
FC
368// /**
369// * Return the next event based on the context supplied. The context
370// * will be updated for the subsequent read.
0283f7ff 371// *
c32744d6
FC
372// * @param context the trace read context (updated)
373// * @return the event referred to by context
374// */
375// public abstract T getNext(ITmfContext context);
948b0607
FC
376
377 /**
378 * Checks if the data meets the request completion criteria.
0283f7ff 379 *
0d9a6d76 380 * @param request the request
8584dc20 381 * @param event the event to check
0d9a6d76
FC
382 * @param nbRead the number of events read so far
383 * @return true if completion criteria is met
b6cfa2bb 384<<<<<<< Upstream, based on master
8584dc20 385 *
b6cfa2bb
FC
386=======
387>>>>>>> f5b88da Refactor TmfRequest
8584dc20 388 * @since 2.0
948b0607 389 */
8584dc20
FC
390 public boolean isCompleted(ITmfRequest request, ITmfEvent event, int nbRead) {
391 return request.isCompleted() ||
392 nbRead >= request.getNbRequested() ||
393 request.getTimeRange().getEndTime().compareTo(event.getTimestamp()) < 0;
948b0607
FC
394 }
395
fc7cd0be
AM
396 // ------------------------------------------------------------------------
397 // Pass-through's to the request executor
398 // ------------------------------------------------------------------------
399
400 /**
401 * @return the shutdown state (i.e. if it is accepting new requests)
402 * @since 2.0
403 */
404 protected boolean executorIsShutdown() {
405 return fExecutor.isShutdown();
406 }
407
408 /**
409 * @return the termination state
410 * @since 2.0
411 */
412 protected boolean executorIsTerminated() {
413 return fExecutor.isTerminated();
414 }
415
948b0607
FC
416 // ------------------------------------------------------------------------
417 // Signal handlers
418 // ------------------------------------------------------------------------
419
063f0d27
AM
420 /**
421 * Handler for the start synch signal
422 *
423 * @param signal
424 * Incoming signal
425 */
948b0607
FC
426 @TmfSignalHandler
427 public void startSynch(TmfStartSynchSignal signal) {
428 synchronized (fLock) {
429 fSignalDepth++;
430 }
431 }
432
063f0d27
AM
433 /**
434 * Handler for the end synch signal
435 *
436 * @param signal
437 * Incoming signal
438 */
948b0607
FC
439 @TmfSignalHandler
440 public void endSynch(TmfEndSynchSignal signal) {
045df77d 441 synchronized (fLock) {
948b0607
FC
442 fSignalDepth--;
443 if (fSignalDepth == 0) {
444 fireRequest();
445 }
045df77d 446 }
948b0607 447 }
8c8bf09f
ASL
448
449}
This page took 0.066692 seconds and 5 git commands to generate.