Commit | Line | Data |
---|---|---|
8c8bf09f | 1 | /******************************************************************************* |
61759503 | 2 | * Copyright (c) 2009, 2013 Ericsson |
0283f7ff | 3 | * |
8c8bf09f ASL |
4 | * All rights reserved. This program and the accompanying materials are |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
0283f7ff | 8 | * |
8c8bf09f | 9 | * Contributors: |
fd3f1eff AM |
10 | * Francois Chouinard - Initial API and implementation, replace background |
11 | * requests by preemptable requests | |
12 | * Alexandre Montplaisir - Merge with TmfDataProvider | |
8c8bf09f ASL |
13 | *******************************************************************************/ |
14 | ||
6c13869b | 15 | package org.eclipse.linuxtools.tmf.core.component; |
8c8bf09f | 16 | |
fd3f1eff AM |
17 | import java.util.ArrayList; |
18 | import java.util.List; | |
19 | import java.util.concurrent.BlockingQueue; | |
20 | import java.util.concurrent.LinkedBlockingQueue; | |
21 | import java.util.concurrent.SynchronousQueue; | |
22 | ||
5419a136 | 23 | import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; |
fd3f1eff AM |
24 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; |
25 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; | |
5419a136 | 26 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest; |
fd3f1eff | 27 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; |
72f1e62a | 28 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
5419a136 | 29 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest; |
fd3f1eff AM |
30 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest.ExecutionType; |
31 | import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal; | |
32 | import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler; | |
33 | import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal; | |
3bd46eef | 34 | import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp; |
fd3f1eff | 35 | import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; |
8c8bf09f ASL |
36 | |
37 | /** | |
fd3f1eff AM |
38 | * An abstract base class that implements ITmfEventProvider. |
39 | * <p> | |
40 | * This abstract class implements the housekeeping methods to register/ | |
41 | * de-register the event provider and to handle generically the event requests. | |
42 | * </p> | |
0283f7ff | 43 | * |
8fd82db5 | 44 | * @author Francois Chouinard |
c4767854 | 45 | * @since 3.0 |
8c8bf09f | 46 | */ |
fd3f1eff AM |
47 | public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider { |
48 | ||
49 | // ------------------------------------------------------------------------ | |
50 | // Constants | |
51 | // ------------------------------------------------------------------------ | |
52 | ||
c4767854 AM |
53 | /** Default amount of events per request "chunk" |
54 | * @since 3.0 */ | |
fd3f1eff AM |
55 | public static final int DEFAULT_BLOCK_SIZE = 50000; |
56 | ||
c4767854 AM |
57 | /** Default size of the queue |
58 | * @since 3.0 */ | |
fd3f1eff AM |
59 | public static final int DEFAULT_QUEUE_SIZE = 1000; |
60 | ||
61 | // ------------------------------------------------------------------------ | |
62 | // Attributes | |
63 | // ------------------------------------------------------------------------ | |
64 | ||
c4767854 AM |
65 | /** List of coalesced requests |
66 | * @since 3.0*/ | |
a4524c1b | 67 | protected final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new ArrayList<>(); |
fd3f1eff | 68 | |
c4767854 AM |
69 | /** The type of event handled by this provider |
70 | * @since 3.0*/ | |
fd3f1eff AM |
71 | protected Class<? extends ITmfEvent> fType; |
72 | ||
c4767854 AM |
73 | /** Queue of events |
74 | * @since 3.0*/ | |
fd3f1eff AM |
75 | protected BlockingQueue<ITmfEvent> fDataQueue; |
76 | ||
c4767854 AM |
77 | /** Size of the fDataQueue |
78 | * @since 3.0*/ | |
fd3f1eff AM |
79 | protected int fQueueSize = DEFAULT_QUEUE_SIZE; |
80 | ||
81 | private final TmfRequestExecutor fExecutor; | |
82 | ||
83 | private final Object fLock = new Object(); | |
84 | ||
85 | private int fSignalDepth = 0; | |
86 | ||
87 | private int fRequestPendingCounter = 0; | |
8c8bf09f | 88 | |
00641a97 FC |
89 | // ------------------------------------------------------------------------ |
90 | // Constructors | |
91 | // ------------------------------------------------------------------------ | |
92 | ||
063f0d27 AM |
93 | /** |
94 | * Default constructor | |
95 | */ | |
12c155f5 | 96 | public TmfEventProvider() { |
00641a97 | 97 | super(); |
fd3f1eff | 98 | fQueueSize = DEFAULT_QUEUE_SIZE; |
a4524c1b | 99 | fDataQueue = new LinkedBlockingQueue<>(fQueueSize); |
fd3f1eff | 100 | fExecutor = new TmfRequestExecutor(); |
12c155f5 | 101 | } |
8c8bf09f | 102 | |
063f0d27 | 103 | /** |
fd3f1eff | 104 | * Initialize this data provider |
063f0d27 AM |
105 | * |
106 | * @param name | |
fd3f1eff | 107 | * Name of the provider |
063f0d27 | 108 | * @param type |
fd3f1eff | 109 | * The type of events that will be handled |
063f0d27 | 110 | */ |
fd3f1eff AM |
111 | public void init(String name, Class<? extends ITmfEvent> type) { |
112 | super.init(name); | |
113 | fType = type; | |
114 | fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>(); | |
115 | ||
116 | fExecutor.init(); | |
117 | fSignalDepth = 0; | |
118 | ||
119 | TmfProviderManager.register(fType, this); | |
12c155f5 FC |
120 | } |
121 | ||
063f0d27 | 122 | /** |
fd3f1eff | 123 | * Constructor specifying the event type and the queue size. |
063f0d27 AM |
124 | * |
125 | * @param name | |
fd3f1eff | 126 | * Name of the provider |
063f0d27 | 127 | * @param type |
fd3f1eff | 128 | * Type of event that will be handled |
063f0d27 | 129 | * @param queueSize |
fd3f1eff | 130 | * Size of the event queue |
063f0d27 | 131 | */ |
fd3f1eff AM |
132 | protected TmfEventProvider(String name, Class<? extends ITmfEvent> type, int queueSize) { |
133 | this(); | |
134 | fQueueSize = queueSize; | |
135 | init(name, type); | |
12c155f5 FC |
136 | } |
137 | ||
063f0d27 AM |
138 | /** |
139 | * Copy constructor | |
140 | * | |
141 | * @param other | |
fd3f1eff | 142 | * The other object to copy |
063f0d27 | 143 | */ |
6256d8ad | 144 | public TmfEventProvider(TmfEventProvider other) { |
fd3f1eff AM |
145 | this(); |
146 | init(other.getName(), other.fType); | |
147 | } | |
148 | ||
149 | /** | |
150 | * Standard constructor. Instantiate and initialize at the same time. | |
151 | * | |
152 | * @param name | |
153 | * Name of the provider | |
154 | * @param type | |
155 | * The type of events that will be handled | |
156 | */ | |
157 | public TmfEventProvider(String name, Class<? extends ITmfEvent> type) { | |
158 | this(name, type, DEFAULT_QUEUE_SIZE); | |
159 | } | |
160 | ||
161 | @Override | |
162 | public void dispose() { | |
163 | TmfProviderManager.deregister(fType, this); | |
164 | fExecutor.stop(); | |
165 | super.dispose(); | |
12c155f5 FC |
166 | } |
167 | ||
00641a97 | 168 | // ------------------------------------------------------------------------ |
fd3f1eff AM |
169 | // Accessors |
170 | // ------------------------------------------------------------------------ | |
171 | ||
172 | /** | |
173 | * Get the queue size of this provider | |
174 | * | |
175 | * @return The size of the queue | |
176 | */ | |
177 | public int getQueueSize() { | |
178 | return fQueueSize; | |
179 | } | |
180 | ||
181 | /** | |
182 | * Get the event type this provider handles | |
183 | * | |
184 | * @return The type of ITmfEvent | |
185 | */ | |
0f89d4ba | 186 | public Class<? extends ITmfEvent> getType() { |
fd3f1eff AM |
187 | return fType; |
188 | } | |
189 | ||
190 | // ------------------------------------------------------------------------ | |
191 | // ITmfRequestHandler | |
00641a97 FC |
192 | // ------------------------------------------------------------------------ |
193 | ||
c4767854 AM |
194 | /** |
195 | * @since 3.0 | |
196 | */ | |
5419a136 | 197 | @Override |
fd3f1eff AM |
198 | public void sendRequest(final ITmfEventRequest request) { |
199 | synchronized (fLock) { | |
200 | if (fSignalDepth > 0) { | |
201 | coalesceEventRequest(request); | |
202 | } else { | |
203 | dispatchRequest(request); | |
204 | } | |
205 | } | |
206 | } | |
207 | ||
208 | @Override | |
209 | public void fireRequest() { | |
210 | synchronized (fLock) { | |
211 | if (fRequestPendingCounter > 0) { | |
212 | return; | |
213 | } | |
214 | if (fPendingCoalescedRequests.size() > 0) { | |
215 | for (ITmfEventRequest request : fPendingCoalescedRequests) { | |
216 | dispatchRequest(request); | |
217 | } | |
218 | fPendingCoalescedRequests.clear(); | |
219 | } | |
5419a136 | 220 | } |
5419a136 AM |
221 | } |
222 | ||
fd3f1eff AM |
223 | /** |
224 | * Increments/decrements the pending requests counters and fires the request | |
225 | * if necessary (counter == 0). Used for coalescing requests across multiple | |
226 | * TmfDataProvider's. | |
227 | * | |
228 | * @param isIncrement | |
229 | * Should we increment (true) or decrement (false) the pending | |
230 | * counter | |
231 | */ | |
5419a136 | 232 | @Override |
fd3f1eff AM |
233 | public void notifyPendingRequest(boolean isIncrement) { |
234 | synchronized (fLock) { | |
235 | if (isIncrement) { | |
236 | if (fSignalDepth > 0) { | |
237 | fRequestPendingCounter++; | |
238 | } | |
239 | } else { | |
240 | if (fRequestPendingCounter > 0) { | |
241 | fRequestPendingCounter--; | |
242 | } | |
243 | ||
244 | // fire request if all pending requests are received | |
245 | if (fRequestPendingCounter == 0) { | |
246 | fireRequest(); | |
247 | } | |
248 | } | |
249 | } | |
250 | } | |
251 | ||
252 | // ------------------------------------------------------------------------ | |
253 | // Coalescing | |
254 | // ------------------------------------------------------------------------ | |
255 | ||
256 | /** | |
257 | * Create a new request from an existing one, and add it to the coalesced | |
258 | * requests | |
259 | * | |
260 | * @param request | |
261 | * The request to copy | |
c4767854 | 262 | * @since 3.0 |
fd3f1eff AM |
263 | */ |
264 | protected synchronized void newCoalescedEventRequest(ITmfEventRequest request) { | |
672a642a | 265 | TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest( |
fd3f1eff AM |
266 | request.getDataType(), |
267 | request.getRange(), | |
268 | request.getIndex(), | |
269 | request.getNbRequested(), | |
270 | request.getExecType()); | |
271 | coalescedRequest.addRequest(request); | |
5419a136 AM |
272 | if (TmfCoreTracer.isRequestTraced()) { |
273 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
274 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
275 | } | |
276 | fPendingCoalescedRequests.add(coalescedRequest); | |
fd3f1eff AM |
277 | } |
278 | ||
279 | /** | |
280 | * Add an existing requests to the list of coalesced ones | |
281 | * | |
282 | * @param request | |
283 | * The request to add to the list | |
c4767854 | 284 | * @since 3.0 |
fd3f1eff AM |
285 | */ |
286 | protected void coalesceEventRequest(ITmfEventRequest request) { | |
287 | synchronized (fLock) { | |
288 | for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) { | |
289 | if (coalescedRequest.isCompatible(request)) { | |
290 | coalescedRequest.addRequest(request); | |
291 | if (TmfCoreTracer.isRequestTraced()) { | |
292 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
293 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
294 | } | |
295 | return; | |
296 | } | |
297 | } | |
298 | newCoalescedEventRequest(request); | |
299 | } | |
300 | } | |
301 | ||
302 | // ------------------------------------------------------------------------ | |
303 | // Request processing | |
304 | // ------------------------------------------------------------------------ | |
305 | ||
306 | private void dispatchRequest(final ITmfEventRequest request) { | |
307 | if (request.getExecType() == ExecutionType.FOREGROUND) { | |
308 | queueRequest(request); | |
5419a136 | 309 | } else { |
fd3f1eff AM |
310 | queueBackgroundRequest(request, true); |
311 | } | |
312 | } | |
313 | ||
314 | /** | |
315 | * Queue a request. | |
316 | * | |
317 | * @param request | |
318 | * The data request | |
c4767854 | 319 | * @since 3.0 |
fd3f1eff AM |
320 | */ |
321 | protected void queueRequest(final ITmfEventRequest request) { | |
322 | ||
323 | if (fExecutor.isShutdown()) { | |
324 | request.cancel(); | |
325 | return; | |
326 | } | |
327 | ||
328 | TmfEventThread thread = new TmfEventThread(this, request); | |
329 | ||
330 | if (TmfCoreTracer.isRequestTraced()) { | |
331 | TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ | |
332 | } | |
333 | ||
334 | fExecutor.execute(thread); | |
335 | } | |
336 | ||
337 | /** | |
338 | * Queue a background request | |
339 | * | |
340 | * @param request | |
341 | * The request | |
342 | * @param indexing | |
343 | * Should we index the chunks | |
344 | * @since 3.0 | |
345 | */ | |
346 | protected void queueBackgroundRequest(final ITmfEventRequest request, final boolean indexing) { | |
347 | queueRequest(request); | |
348 | } | |
349 | ||
350 | /** | |
351 | * Initialize the provider based on the request. The context is provider | |
352 | * specific and will be updated by getNext(). | |
353 | * | |
354 | * @param request | |
355 | * The request | |
356 | * @return An application specific context; null if request can't be | |
357 | * serviced | |
c4767854 | 358 | * @since 3.0 |
fd3f1eff AM |
359 | */ |
360 | public abstract ITmfContext armRequest(ITmfEventRequest request); | |
361 | ||
362 | /** | |
363 | * Checks if the data meets the request completion criteria. | |
364 | * | |
365 | * @param request | |
366 | * The request | |
367 | * @param event | |
368 | * The data to verify | |
369 | * @param nbRead | |
370 | * The number of events read so far | |
371 | * @return true if completion criteria is met | |
c4767854 | 372 | * @since 3.0 |
fd3f1eff AM |
373 | */ |
374 | public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) { | |
375 | boolean requestCompleted = isCompleted2(request, nbRead); | |
376 | if (!requestCompleted) { | |
377 | ITmfTimestamp endTime = request.getRange().getEndTime(); | |
378 | return event.getTimestamp().compareTo(endTime, false) > 0; | |
379 | } | |
380 | return requestCompleted; | |
381 | } | |
382 | ||
383 | private static boolean isCompleted2(ITmfEventRequest request,int nbRead) { | |
384 | return request.isCompleted() || nbRead >= request.getNbRequested(); | |
385 | } | |
386 | ||
387 | // ------------------------------------------------------------------------ | |
388 | // Pass-through's to the request executor | |
389 | // ------------------------------------------------------------------------ | |
390 | ||
391 | /** | |
392 | * @return the shutdown state (i.e. if it is accepting new requests) | |
393 | * @since 2.0 | |
394 | */ | |
395 | protected boolean executorIsShutdown() { | |
396 | return fExecutor.isShutdown(); | |
397 | } | |
398 | ||
399 | /** | |
400 | * @return the termination state | |
401 | * @since 2.0 | |
402 | */ | |
403 | protected boolean executorIsTerminated() { | |
404 | return fExecutor.isTerminated(); | |
405 | } | |
406 | ||
407 | // ------------------------------------------------------------------------ | |
408 | // Signal handlers | |
409 | // ------------------------------------------------------------------------ | |
410 | ||
411 | /** | |
412 | * Handler for the start synch signal | |
413 | * | |
414 | * @param signal | |
415 | * Incoming signal | |
416 | */ | |
417 | @TmfSignalHandler | |
418 | public void startSynch(TmfStartSynchSignal signal) { | |
419 | synchronized (fLock) { | |
420 | fSignalDepth++; | |
421 | } | |
422 | } | |
423 | ||
424 | /** | |
425 | * Handler for the end synch signal | |
426 | * | |
427 | * @param signal | |
428 | * Incoming signal | |
429 | */ | |
430 | @TmfSignalHandler | |
431 | public void endSynch(TmfEndSynchSignal signal) { | |
432 | synchronized (fLock) { | |
433 | fSignalDepth--; | |
434 | if (fSignalDepth == 0) { | |
435 | fireRequest(); | |
436 | } | |
5419a136 AM |
437 | } |
438 | } | |
951d134a | 439 | |
8c8bf09f | 440 | } |