Commit | Line | Data |
---|---|---|
8c8bf09f | 1 | /******************************************************************************* |
8584dc20 | 2 | * Copyright (c) 2009, 2010, 2012 Ericsson |
0283f7ff | 3 | * |
8c8bf09f ASL |
4 | * All rights reserved. This program and the accompanying materials are |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
0283f7ff | 8 | * |
8c8bf09f ASL |
9 | * Contributors: |
10 | * Francois Chouinard - Initial API and implementation | |
8584dc20 FC |
11 | * Francois Chouinard - Replace background requests by pre-emptible requests |
12 | * Francois Chouinard - Rebased on TmfCoalescedRequest:s | |
8c8bf09f ASL |
13 | *******************************************************************************/ |
14 | ||
6c13869b | 15 | package org.eclipse.linuxtools.tmf.core.component; |
8c8bf09f | 16 | |
8c8bf09f ASL |
17 | import java.util.Vector; |
18 | import java.util.concurrent.BlockingQueue; | |
19 | import java.util.concurrent.LinkedBlockingQueue; | |
9b635e61 | 20 | import java.util.concurrent.SynchronousQueue; |
8c8bf09f | 21 | |
5500a7f0 | 22 | import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; |
96b353c5 | 23 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; |
8fd82db5 | 24 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; |
8584dc20 | 25 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedRequest; |
8fd82db5 | 26 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; |
72f1e62a | 27 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
8584dc20 FC |
28 | import org.eclipse.linuxtools.tmf.core.request.ITmfRequest; |
29 | import org.eclipse.linuxtools.tmf.core.request.ITmfRequest.TmfRequestPriority; | |
6c13869b FC |
30 | import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal; |
31 | import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler; | |
32 | import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal; | |
33 | import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; | |
8c8bf09f ASL |
34 | |
35 | /** | |
8584dc20 | 36 | * An abstract base class that implements ITmfEventProvider. |
8c8bf09f | 37 | * <p> |
8fd82db5 FC |
38 | * This abstract class implements the housekeeping methods to register/ |
39 | * de-register the event provider and to handle generically the event requests. | |
8c8bf09f | 40 | * <p> |
8fd82db5 FC |
41 | * The concrete class can either re-implement processRequest() entirely or just |
42 | * implement the hooks (initializeContext() and getNext()). | |
8c8bf09f | 43 | * <p> |
0283f7ff | 44 | * |
8fd82db5 | 45 | * @author Francois Chouinard |
96b353c5 | 46 | * @version 1.1 |
8c8bf09f | 47 | */ |
8584dc20 | 48 | public abstract class TmfDataProvider extends TmfComponent implements ITmfEventProvider { |
8c8bf09f | 49 | |
948b0607 FC |
50 | // ------------------------------------------------------------------------ |
51 | // Constants | |
52 | // ------------------------------------------------------------------------ | |
550d787e | 53 | |
063f0d27 | 54 | /** Default amount of events per request "chunk" */ |
00641a97 | 55 | public static final int DEFAULT_BLOCK_SIZE = 50000; |
063f0d27 AM |
56 | |
57 | /** Default size of the queue */ | |
00641a97 FC |
58 | public static final int DEFAULT_QUEUE_SIZE = 1000; |
59 | ||
948b0607 | 60 | // ------------------------------------------------------------------------ |
12c155f5 | 61 | // Attributes |
948b0607 | 62 | // ------------------------------------------------------------------------ |
8c8bf09f | 63 | |
6f4e8ec0 | 64 | /** The type of event handled by this provider */ |
6256d8ad | 65 | protected Class<? extends ITmfEvent> fType; |
6f4e8ec0 AM |
66 | |
67 | /** Is there some data being logged? */ | |
12c155f5 | 68 | protected boolean fLogData; |
6f4e8ec0 AM |
69 | |
70 | /** Are errors being logged? */ | |
12c155f5 | 71 | protected boolean fLogError; |
f6b14ce2 | 72 | |
6f4e8ec0 | 73 | /** Queue of events */ |
6256d8ad | 74 | protected BlockingQueue<ITmfEvent> fDataQueue; |
6f4e8ec0 AM |
75 | |
76 | /** Size of the fDataQueue */ | |
77 | protected int fQueueSize = DEFAULT_QUEUE_SIZE; | |
78 | ||
fc7cd0be | 79 | private TmfRequestExecutor fExecutor; |
948b0607 FC |
80 | |
81 | private int fSignalDepth = 0; | |
045df77d | 82 | private final Object fLock = new Object(); |
951d134a | 83 | |
c1c69938 FC |
84 | private int fRequestPendingCounter = 0; |
85 | ||
8584dc20 FC |
86 | /** List of coalesced requests */ |
87 | protected Vector<TmfCoalescedRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedRequest>(); | |
88 | ||
948b0607 FC |
89 | // ------------------------------------------------------------------------ |
90 | // Constructors | |
91 | // ------------------------------------------------------------------------ | |
92 | ||
063f0d27 AM |
93 | /** |
94 | * Default constructor | |
95 | */ | |
12c155f5 | 96 | public TmfDataProvider() { |
00641a97 | 97 | super(); |
12c155f5 | 98 | fQueueSize = DEFAULT_QUEUE_SIZE; |
6256d8ad | 99 | fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize); |
12c155f5 FC |
100 | fExecutor = new TmfRequestExecutor(); |
101 | } | |
102 | ||
063f0d27 AM |
103 | /** |
104 | * Initialize this data provider | |
105 | * | |
106 | * @param name | |
107 | * Name of the provider | |
108 | * @param type | |
109 | * The type of events that will be handled | |
110 | */ | |
6256d8ad | 111 | public void init(String name, Class<? extends ITmfEvent> type) { |
12c155f5 | 112 | super.init(name); |
3791b5df | 113 | fType = type; |
6256d8ad | 114 | fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>(); |
12c155f5 FC |
115 | |
116 | fExecutor = new TmfRequestExecutor(); | |
117 | fSignalDepth = 0; | |
118 | ||
5500a7f0 | 119 | fLogData = TmfCoreTracer.isEventTraced(); |
12c155f5 FC |
120 | |
121 | TmfProviderManager.register(fType, this); | |
122 | } | |
123 | ||
6f4e8ec0 AM |
124 | /** |
125 | * Constructor specifying the event type and the queue size. | |
126 | * | |
127 | * @param name | |
128 | * Name of the provider | |
129 | * @param type | |
130 | * Type of event that will be handled | |
131 | * @param queueSize | |
132 | * Size of the event queue | |
133 | */ | |
6256d8ad | 134 | protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) { |
00641a97 | 135 | this(); |
948b0607 | 136 | fQueueSize = queueSize; |
00641a97 | 137 | init(name, type); |
948b0607 FC |
138 | } |
139 | ||
063f0d27 AM |
140 | /** |
141 | * Copy constructor | |
142 | * | |
143 | * @param other | |
144 | * The other object to copy | |
145 | */ | |
6256d8ad | 146 | public TmfDataProvider(TmfDataProvider other) { |
00641a97 FC |
147 | this(); |
148 | init(other.getName(), other.fType); | |
149 | } | |
550d787e | 150 | |
063f0d27 AM |
151 | /** |
152 | * Standard constructor. Instantiate and initialize at the same time. | |
153 | * | |
154 | * @param name | |
155 | * Name of the provider | |
156 | * @param type | |
157 | * The type of events that will be handled | |
158 | */ | |
6256d8ad | 159 | public TmfDataProvider(String name, Class<? extends ITmfEvent> type) { |
00641a97 | 160 | this(name, type, DEFAULT_QUEUE_SIZE); |
948b0607 FC |
161 | } |
162 | ||
163 | @Override | |
164 | public void dispose() { | |
165 | TmfProviderManager.deregister(fType, this); | |
166 | fExecutor.stop(); | |
167 | super.dispose(); | |
948b0607 FC |
168 | } |
169 | ||
00641a97 FC |
170 | // ------------------------------------------------------------------------ |
171 | // Accessors | |
172 | // ------------------------------------------------------------------------ | |
173 | ||
063f0d27 AM |
174 | /** |
175 | * Get the queue size of this provider | |
176 | * | |
177 | * @return The size of the queue | |
178 | */ | |
948b0607 FC |
179 | public int getQueueSize() { |
180 | return fQueueSize; | |
181 | } | |
182 | ||
063f0d27 AM |
183 | /** |
184 | * Get the event type this provider handles | |
185 | * | |
186 | * @return The type of ITmfEvent | |
187 | */ | |
948b0607 FC |
188 | public Class<?> getType() { |
189 | return fType; | |
190 | } | |
191 | ||
192 | // ------------------------------------------------------------------------ | |
193 | // ITmfRequestHandler | |
194 | // ------------------------------------------------------------------------ | |
195 | ||
8584dc20 FC |
196 | /* (non-Javadoc) |
197 | * @see org.eclipse.linuxtools.tmf.core.component.ITmfEventProvider#sendRequest(org.eclipse.linuxtools.tmf.core.request.ITmfRequest) | |
198 | */ | |
199 | /** | |
200 | * @since 2.0 | |
201 | */ | |
948b0607 | 202 | @Override |
8584dc20 | 203 | public void sendRequest(final ITmfRequest request) { |
948b0607 FC |
204 | synchronized (fLock) { |
205 | if (fSignalDepth > 0) { | |
206 | coalesceDataRequest(request); | |
207 | } else { | |
208 | dispatchRequest(request); | |
209 | } | |
210 | } | |
211 | } | |
212 | ||
8584dc20 FC |
213 | /* (non-Javadoc) |
214 | * @see org.eclipse.linuxtools.tmf.core.component.ITmfEventProvider#fireRequest() | |
215 | */ | |
b6cfa2bb FC |
216 | /** |
217 | * @since 2.0 | |
218 | */ | |
948b0607 | 219 | @Override |
c1c69938 | 220 | public void fireRequest() { |
948b0607 FC |
221 | synchronized (fLock) { |
222 | if (fRequestPendingCounter > 0) { | |
223 | return; | |
224 | } | |
225 | if (fPendingCoalescedRequests.size() > 0) { | |
8584dc20 | 226 | for (ITmfRequest request : fPendingCoalescedRequests) { |
948b0607 FC |
227 | dispatchRequest(request); |
228 | } | |
229 | fPendingCoalescedRequests.clear(); | |
230 | } | |
231 | } | |
232 | } | |
233 | ||
234 | /** | |
063f0d27 | 235 | * Increments/decrements the pending requests counters and fires the request |
0283f7ff FC |
236 | * if necessary (counter == 0). Used for coalescing requests across multiple |
237 | * TmfDataProvider's. | |
063f0d27 | 238 | * |
948b0607 | 239 | * @param isIncrement |
0283f7ff FC |
240 | * Should we increment (true) or decrement (false) the pending |
241 | * counter | |
948b0607 | 242 | */ |
c1c69938 FC |
243 | @Override |
244 | public void notifyPendingRequest(boolean isIncrement) { | |
948b0607 | 245 | synchronized (fLock) { |
c1c69938 FC |
246 | if (isIncrement) { |
247 | if (fSignalDepth > 0) { | |
248 | fRequestPendingCounter++; | |
249 | } | |
250 | } else { | |
251 | if (fRequestPendingCounter > 0) { | |
252 | fRequestPendingCounter--; | |
253 | } | |
948b0607 | 254 | |
c1c69938 FC |
255 | // fire request if all pending requests are received |
256 | if (fRequestPendingCounter == 0) { | |
257 | fireRequest(); | |
258 | } | |
259 | } | |
260 | } | |
948b0607 FC |
261 | } |
262 | ||
263 | // ------------------------------------------------------------------------ | |
264 | // Coalescing (primitive test...) | |
265 | // ------------------------------------------------------------------------ | |
266 | ||
6f4e8ec0 AM |
267 | /** |
268 | * Create a new request from an existing one, and add it to the coalesced | |
269 | * requests | |
270 | * | |
8584dc20 FC |
271 | * @param request The request to copy |
272 | * @since 2.0 | |
6f4e8ec0 | 273 | */ |
8584dc20 | 274 | protected void newCoalescedDataRequest(ITmfRequest request) { |
948b0607 | 275 | synchronized (fLock) { |
8584dc20 | 276 | TmfCoalescedRequest coalescedRequest = new TmfCoalescedRequest(request); |
5500a7f0 FC |
277 | if (TmfCoreTracer.isRequestTraced()) { |
278 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
279 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
948b0607 FC |
280 | } |
281 | fPendingCoalescedRequests.add(coalescedRequest); | |
282 | } | |
283 | } | |
284 | ||
6f4e8ec0 AM |
285 | /** |
286 | * Add an existing requests to the list of coalesced ones | |
287 | * | |
8584dc20 FC |
288 | * @param request The request to add to the list |
289 | * @since 2.0 | |
6f4e8ec0 | 290 | */ |
8584dc20 | 291 | protected void coalesceDataRequest(ITmfRequest request) { |
948b0607 | 292 | synchronized (fLock) { |
8584dc20 | 293 | for (TmfCoalescedRequest coalescedRequest : fPendingCoalescedRequests) { |
948b0607 FC |
294 | if (coalescedRequest.isCompatible(request)) { |
295 | coalescedRequest.addRequest(request); | |
5500a7f0 FC |
296 | if (TmfCoreTracer.isRequestTraced()) { |
297 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
298 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
948b0607 FC |
299 | } |
300 | return; | |
301 | } | |
302 | } | |
303 | newCoalescedDataRequest(request); | |
304 | } | |
305 | } | |
306 | ||
307 | // ------------------------------------------------------------------------ | |
308 | // Request processing | |
309 | // ------------------------------------------------------------------------ | |
310 | ||
8584dc20 FC |
311 | private void dispatchRequest(final ITmfRequest request) { |
312 | if (request.getRequestPriority() == TmfRequestPriority.HIGH) { | |
948b0607 | 313 | queueRequest(request); |
0283f7ff | 314 | } else { |
8584dc20 | 315 | queueBackgroundRequest(request, true); |
0283f7ff | 316 | } |
948b0607 FC |
317 | } |
318 | ||
6f4e8ec0 AM |
319 | /** |
320 | * Queue a request. | |
321 | * | |
8584dc20 FC |
322 | * @param request The data request |
323 | * @since 2.0 | |
6f4e8ec0 | 324 | */ |
8584dc20 | 325 | protected void queueRequest(final ITmfRequest request) { |
948b0607 FC |
326 | |
327 | if (fExecutor.isShutdown()) { | |
328 | request.cancel(); | |
329 | return; | |
330 | } | |
331 | ||
96b353c5 | 332 | TmfEventThread thread = new TmfEventThread(this, request); |
948b0607 | 333 | |
5500a7f0 FC |
334 | if (TmfCoreTracer.isRequestTraced()) { |
335 | TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ | |
0283f7ff | 336 | } |
96b353c5 | 337 | |
948b0607 | 338 | fExecutor.execute(thread); |
948b0607 FC |
339 | } |
340 | ||
6f4e8ec0 AM |
341 | /** |
342 | * Queue a background request | |
343 | * | |
8584dc20 FC |
344 | * @param request The request |
345 | * @param indexing Should we index the chunks | |
b6cfa2bb | 346 | <<<<<<< Upstream, based on master |
8584dc20 | 347 | * |
b6cfa2bb FC |
348 | ======= |
349 | >>>>>>> f5b88da Refactor TmfRequest | |
8584dc20 | 350 | * @since 2.0 |
6f4e8ec0 | 351 | */ |
8584dc20 | 352 | protected void queueBackgroundRequest(final ITmfRequest request, final boolean indexing) { |
96b353c5 | 353 | queueRequest(request); |
948b0607 FC |
354 | } |
355 | ||
356 | /** | |
d337369a FC |
357 | * Initialize the provider based on the request. The context is provider |
358 | * specific and will be updated by getNext(). | |
0283f7ff | 359 | * |
948b0607 | 360 | * @param request |
6f4e8ec0 | 361 | * The request |
80b237e6 | 362 | * @return An application specific context; null if request can't be |
6f4e8ec0 | 363 | * serviced |
80b237e6 | 364 | * @since 2.0 |
948b0607 | 365 | */ |
8584dc20 | 366 | public abstract ITmfContext armRequest(ITmfRequest request); |
948b0607 | 367 | |
c32744d6 FC |
368 | // /** |
369 | // * Return the next event based on the context supplied. The context | |
370 | // * will be updated for the subsequent read. | |
0283f7ff | 371 | // * |
c32744d6 FC |
372 | // * @param context the trace read context (updated) |
373 | // * @return the event referred to by context | |
374 | // */ | |
375 | // public abstract T getNext(ITmfContext context); | |
948b0607 FC |
376 | |
377 | /** | |
378 | * Checks if the data meets the request completion criteria. | |
0283f7ff | 379 | * |
0d9a6d76 | 380 | * @param request the request |
8584dc20 | 381 | * @param event the event to check |
0d9a6d76 FC |
382 | * @param nbRead the number of events read so far |
383 | * @return true if completion criteria is met | |
b6cfa2bb | 384 | <<<<<<< Upstream, based on master |
8584dc20 | 385 | * |
b6cfa2bb FC |
386 | ======= |
387 | >>>>>>> f5b88da Refactor TmfRequest | |
8584dc20 | 388 | * @since 2.0 |
948b0607 | 389 | */ |
8584dc20 FC |
390 | public boolean isCompleted(ITmfRequest request, ITmfEvent event, int nbRead) { |
391 | return request.isCompleted() || | |
392 | nbRead >= request.getNbRequested() || | |
393 | request.getTimeRange().getEndTime().compareTo(event.getTimestamp()) < 0; | |
948b0607 FC |
394 | } |
395 | ||
fc7cd0be AM |
396 | // ------------------------------------------------------------------------ |
397 | // Pass-through's to the request executor | |
398 | // ------------------------------------------------------------------------ | |
399 | ||
400 | /** | |
401 | * @return the shutdown state (i.e. if it is accepting new requests) | |
402 | * @since 2.0 | |
403 | */ | |
404 | protected boolean executorIsShutdown() { | |
405 | return fExecutor.isShutdown(); | |
406 | } | |
407 | ||
408 | /** | |
409 | * @return the termination state | |
410 | * @since 2.0 | |
411 | */ | |
412 | protected boolean executorIsTerminated() { | |
413 | return fExecutor.isTerminated(); | |
414 | } | |
415 | ||
948b0607 FC |
416 | // ------------------------------------------------------------------------ |
417 | // Signal handlers | |
418 | // ------------------------------------------------------------------------ | |
419 | ||
063f0d27 AM |
420 | /** |
421 | * Handler for the start synch signal | |
422 | * | |
423 | * @param signal | |
424 | * Incoming signal | |
425 | */ | |
948b0607 FC |
426 | @TmfSignalHandler |
427 | public void startSynch(TmfStartSynchSignal signal) { | |
428 | synchronized (fLock) { | |
429 | fSignalDepth++; | |
430 | } | |
431 | } | |
432 | ||
063f0d27 AM |
433 | /** |
434 | * Handler for the end synch signal | |
435 | * | |
436 | * @param signal | |
437 | * Incoming signal | |
438 | */ | |
948b0607 FC |
439 | @TmfSignalHandler |
440 | public void endSynch(TmfEndSynchSignal signal) { | |
045df77d | 441 | synchronized (fLock) { |
948b0607 FC |
442 | fSignalDepth--; |
443 | if (fSignalDepth == 0) { | |
444 | fireRequest(); | |
445 | } | |
045df77d | 446 | } |
948b0607 | 447 | } |
8c8bf09f ASL |
448 | |
449 | } |