Commit | Line | Data |
---|---|---|
8c8bf09f | 1 | /******************************************************************************* |
61759503 | 2 | * Copyright (c) 2009, 2013 Ericsson |
0283f7ff | 3 | * |
8c8bf09f ASL |
4 | * All rights reserved. This program and the accompanying materials are |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
0283f7ff | 8 | * |
8c8bf09f | 9 | * Contributors: |
fd3f1eff AM |
10 | * Francois Chouinard - Initial API and implementation, replace background |
11 | * requests by preemptable requests | |
12 | * Alexandre Montplaisir - Merge with TmfDataProvider | |
8c8bf09f ASL |
13 | *******************************************************************************/ |
14 | ||
6c13869b | 15 | package org.eclipse.linuxtools.tmf.core.component; |
8c8bf09f | 16 | |
fd3f1eff AM |
17 | import java.util.ArrayList; |
18 | import java.util.List; | |
19 | import java.util.concurrent.BlockingQueue; | |
20 | import java.util.concurrent.LinkedBlockingQueue; | |
21 | import java.util.concurrent.SynchronousQueue; | |
22 | ||
5419a136 | 23 | import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer; |
fd3f1eff AM |
24 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread; |
25 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; | |
5419a136 | 26 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedEventRequest; |
fd3f1eff | 27 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; |
72f1e62a | 28 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
5419a136 | 29 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest; |
fd3f1eff AM |
30 | import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest.ExecutionType; |
31 | import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal; | |
32 | import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler; | |
33 | import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal; | |
3bd46eef | 34 | import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp; |
fd3f1eff | 35 | import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; |
8c8bf09f ASL |
36 | |
37 | /** | |
fd3f1eff AM |
38 | * An abstract base class that implements ITmfEventProvider. |
39 | * <p> | |
40 | * This abstract class implements the housekeeping methods to register/ | |
41 | * de-register the event provider and to handle generically the event requests. | |
42 | * </p> | |
0283f7ff | 43 | * |
8fd82db5 | 44 | * @author Francois Chouinard |
8c8bf09f | 45 | */ |
fd3f1eff AM |
46 | public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider { |
47 | ||
48 | // ------------------------------------------------------------------------ | |
49 | // Constants | |
50 | // ------------------------------------------------------------------------ | |
51 | ||
52 | /** Default amount of events per request "chunk" */ | |
53 | public static final int DEFAULT_BLOCK_SIZE = 50000; | |
54 | ||
55 | /** Default size of the queue */ | |
56 | public static final int DEFAULT_QUEUE_SIZE = 1000; | |
57 | ||
58 | // ------------------------------------------------------------------------ | |
59 | // Attributes | |
60 | // ------------------------------------------------------------------------ | |
61 | ||
62 | /** List of coalesced requests */ | |
63 | protected final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = | |
64 | new ArrayList<TmfCoalescedEventRequest>(); | |
65 | ||
66 | /** The type of event handled by this provider */ | |
67 | protected Class<? extends ITmfEvent> fType; | |
68 | ||
69 | /** Queue of events */ | |
70 | protected BlockingQueue<ITmfEvent> fDataQueue; | |
71 | ||
72 | /** Size of the fDataQueue */ | |
73 | protected int fQueueSize = DEFAULT_QUEUE_SIZE; | |
74 | ||
75 | private final TmfRequestExecutor fExecutor; | |
76 | ||
77 | private final Object fLock = new Object(); | |
78 | ||
79 | private int fSignalDepth = 0; | |
80 | ||
81 | private int fRequestPendingCounter = 0; | |
8c8bf09f | 82 | |
00641a97 FC |
83 | // ------------------------------------------------------------------------ |
84 | // Constructors | |
85 | // ------------------------------------------------------------------------ | |
86 | ||
063f0d27 AM |
87 | /** |
88 | * Default constructor | |
89 | */ | |
12c155f5 | 90 | public TmfEventProvider() { |
00641a97 | 91 | super(); |
fd3f1eff AM |
92 | fQueueSize = DEFAULT_QUEUE_SIZE; |
93 | fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize); | |
94 | fExecutor = new TmfRequestExecutor(); | |
12c155f5 | 95 | } |
8c8bf09f | 96 | |
063f0d27 | 97 | /** |
fd3f1eff | 98 | * Initialize this data provider |
063f0d27 AM |
99 | * |
100 | * @param name | |
fd3f1eff | 101 | * Name of the provider |
063f0d27 | 102 | * @param type |
fd3f1eff | 103 | * The type of events that will be handled |
063f0d27 | 104 | */ |
fd3f1eff AM |
105 | public void init(String name, Class<? extends ITmfEvent> type) { |
106 | super.init(name); | |
107 | fType = type; | |
108 | fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>(); | |
109 | ||
110 | fExecutor.init(); | |
111 | fSignalDepth = 0; | |
112 | ||
113 | TmfProviderManager.register(fType, this); | |
12c155f5 FC |
114 | } |
115 | ||
063f0d27 | 116 | /** |
fd3f1eff | 117 | * Constructor specifying the event type and the queue size. |
063f0d27 AM |
118 | * |
119 | * @param name | |
fd3f1eff | 120 | * Name of the provider |
063f0d27 | 121 | * @param type |
fd3f1eff | 122 | * Type of event that will be handled |
063f0d27 | 123 | * @param queueSize |
fd3f1eff | 124 | * Size of the event queue |
063f0d27 | 125 | */ |
fd3f1eff AM |
126 | protected TmfEventProvider(String name, Class<? extends ITmfEvent> type, int queueSize) { |
127 | this(); | |
128 | fQueueSize = queueSize; | |
129 | init(name, type); | |
12c155f5 FC |
130 | } |
131 | ||
063f0d27 AM |
132 | /** |
133 | * Copy constructor | |
134 | * | |
135 | * @param other | |
fd3f1eff | 136 | * The other object to copy |
063f0d27 | 137 | */ |
6256d8ad | 138 | public TmfEventProvider(TmfEventProvider other) { |
fd3f1eff AM |
139 | this(); |
140 | init(other.getName(), other.fType); | |
141 | } | |
142 | ||
143 | /** | |
144 | * Standard constructor. Instantiate and initialize at the same time. | |
145 | * | |
146 | * @param name | |
147 | * Name of the provider | |
148 | * @param type | |
149 | * The type of events that will be handled | |
150 | */ | |
151 | public TmfEventProvider(String name, Class<? extends ITmfEvent> type) { | |
152 | this(name, type, DEFAULT_QUEUE_SIZE); | |
153 | } | |
154 | ||
155 | @Override | |
156 | public void dispose() { | |
157 | TmfProviderManager.deregister(fType, this); | |
158 | fExecutor.stop(); | |
159 | super.dispose(); | |
12c155f5 FC |
160 | } |
161 | ||
00641a97 | 162 | // ------------------------------------------------------------------------ |
fd3f1eff AM |
163 | // Accessors |
164 | // ------------------------------------------------------------------------ | |
165 | ||
166 | /** | |
167 | * Get the queue size of this provider | |
168 | * | |
169 | * @return The size of the queue | |
170 | */ | |
171 | public int getQueueSize() { | |
172 | return fQueueSize; | |
173 | } | |
174 | ||
175 | /** | |
176 | * Get the event type this provider handles | |
177 | * | |
178 | * @return The type of ITmfEvent | |
179 | */ | |
180 | public Class<?> getType() { | |
181 | return fType; | |
182 | } | |
183 | ||
184 | // ------------------------------------------------------------------------ | |
185 | // ITmfRequestHandler | |
00641a97 FC |
186 | // ------------------------------------------------------------------------ |
187 | ||
5419a136 | 188 | @Override |
fd3f1eff AM |
189 | public void sendRequest(final ITmfEventRequest request) { |
190 | synchronized (fLock) { | |
191 | if (fSignalDepth > 0) { | |
192 | coalesceEventRequest(request); | |
193 | } else { | |
194 | dispatchRequest(request); | |
195 | } | |
196 | } | |
197 | } | |
198 | ||
199 | @Override | |
200 | public void fireRequest() { | |
201 | synchronized (fLock) { | |
202 | if (fRequestPendingCounter > 0) { | |
203 | return; | |
204 | } | |
205 | if (fPendingCoalescedRequests.size() > 0) { | |
206 | for (ITmfEventRequest request : fPendingCoalescedRequests) { | |
207 | dispatchRequest(request); | |
208 | } | |
209 | fPendingCoalescedRequests.clear(); | |
210 | } | |
5419a136 | 211 | } |
5419a136 AM |
212 | } |
213 | ||
fd3f1eff AM |
214 | /** |
215 | * Increments/decrements the pending requests counters and fires the request | |
216 | * if necessary (counter == 0). Used for coalescing requests across multiple | |
217 | * TmfDataProvider's. | |
218 | * | |
219 | * @param isIncrement | |
220 | * Should we increment (true) or decrement (false) the pending | |
221 | * counter | |
222 | */ | |
5419a136 | 223 | @Override |
fd3f1eff AM |
224 | public void notifyPendingRequest(boolean isIncrement) { |
225 | synchronized (fLock) { | |
226 | if (isIncrement) { | |
227 | if (fSignalDepth > 0) { | |
228 | fRequestPendingCounter++; | |
229 | } | |
230 | } else { | |
231 | if (fRequestPendingCounter > 0) { | |
232 | fRequestPendingCounter--; | |
233 | } | |
234 | ||
235 | // fire request if all pending requests are received | |
236 | if (fRequestPendingCounter == 0) { | |
237 | fireRequest(); | |
238 | } | |
239 | } | |
240 | } | |
241 | } | |
242 | ||
243 | // ------------------------------------------------------------------------ | |
244 | // Coalescing | |
245 | // ------------------------------------------------------------------------ | |
246 | ||
247 | /** | |
248 | * Create a new request from an existing one, and add it to the coalesced | |
249 | * requests | |
250 | * | |
251 | * @param request | |
252 | * The request to copy | |
253 | */ | |
254 | protected synchronized void newCoalescedEventRequest(ITmfEventRequest request) { | |
672a642a | 255 | TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest( |
fd3f1eff AM |
256 | request.getDataType(), |
257 | request.getRange(), | |
258 | request.getIndex(), | |
259 | request.getNbRequested(), | |
260 | request.getExecType()); | |
261 | coalescedRequest.addRequest(request); | |
5419a136 AM |
262 | if (TmfCoreTracer.isRequestTraced()) { |
263 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
264 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
265 | } | |
266 | fPendingCoalescedRequests.add(coalescedRequest); | |
fd3f1eff AM |
267 | } |
268 | ||
269 | /** | |
270 | * Add an existing requests to the list of coalesced ones | |
271 | * | |
272 | * @param request | |
273 | * The request to add to the list | |
274 | */ | |
275 | protected void coalesceEventRequest(ITmfEventRequest request) { | |
276 | synchronized (fLock) { | |
277 | for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) { | |
278 | if (coalescedRequest.isCompatible(request)) { | |
279 | coalescedRequest.addRequest(request); | |
280 | if (TmfCoreTracer.isRequestTraced()) { | |
281 | TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ | |
282 | TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
283 | } | |
284 | return; | |
285 | } | |
286 | } | |
287 | newCoalescedEventRequest(request); | |
288 | } | |
289 | } | |
290 | ||
291 | // ------------------------------------------------------------------------ | |
292 | // Request processing | |
293 | // ------------------------------------------------------------------------ | |
294 | ||
295 | private void dispatchRequest(final ITmfEventRequest request) { | |
296 | if (request.getExecType() == ExecutionType.FOREGROUND) { | |
297 | queueRequest(request); | |
5419a136 | 298 | } else { |
fd3f1eff AM |
299 | queueBackgroundRequest(request, true); |
300 | } | |
301 | } | |
302 | ||
303 | /** | |
304 | * Queue a request. | |
305 | * | |
306 | * @param request | |
307 | * The data request | |
308 | */ | |
309 | protected void queueRequest(final ITmfEventRequest request) { | |
310 | ||
311 | if (fExecutor.isShutdown()) { | |
312 | request.cancel(); | |
313 | return; | |
314 | } | |
315 | ||
316 | TmfEventThread thread = new TmfEventThread(this, request); | |
317 | ||
318 | if (TmfCoreTracer.isRequestTraced()) { | |
319 | TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ | |
320 | } | |
321 | ||
322 | fExecutor.execute(thread); | |
323 | } | |
324 | ||
325 | /** | |
326 | * Queue a background request | |
327 | * | |
328 | * @param request | |
329 | * The request | |
330 | * @param indexing | |
331 | * Should we index the chunks | |
332 | * @since 3.0 | |
333 | */ | |
334 | protected void queueBackgroundRequest(final ITmfEventRequest request, final boolean indexing) { | |
335 | queueRequest(request); | |
336 | } | |
337 | ||
338 | /** | |
339 | * Initialize the provider based on the request. The context is provider | |
340 | * specific and will be updated by getNext(). | |
341 | * | |
342 | * @param request | |
343 | * The request | |
344 | * @return An application specific context; null if request can't be | |
345 | * serviced | |
346 | * @since 2.0 | |
347 | */ | |
348 | public abstract ITmfContext armRequest(ITmfEventRequest request); | |
349 | ||
350 | /** | |
351 | * Checks if the data meets the request completion criteria. | |
352 | * | |
353 | * @param request | |
354 | * The request | |
355 | * @param event | |
356 | * The data to verify | |
357 | * @param nbRead | |
358 | * The number of events read so far | |
359 | * @return true if completion criteria is met | |
360 | */ | |
361 | public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) { | |
362 | boolean requestCompleted = isCompleted2(request, nbRead); | |
363 | if (!requestCompleted) { | |
364 | ITmfTimestamp endTime = request.getRange().getEndTime(); | |
365 | return event.getTimestamp().compareTo(endTime, false) > 0; | |
366 | } | |
367 | return requestCompleted; | |
368 | } | |
369 | ||
370 | private static boolean isCompleted2(ITmfEventRequest request,int nbRead) { | |
371 | return request.isCompleted() || nbRead >= request.getNbRequested(); | |
372 | } | |
373 | ||
374 | // ------------------------------------------------------------------------ | |
375 | // Pass-through's to the request executor | |
376 | // ------------------------------------------------------------------------ | |
377 | ||
378 | /** | |
379 | * @return the shutdown state (i.e. if it is accepting new requests) | |
380 | * @since 2.0 | |
381 | */ | |
382 | protected boolean executorIsShutdown() { | |
383 | return fExecutor.isShutdown(); | |
384 | } | |
385 | ||
386 | /** | |
387 | * @return the termination state | |
388 | * @since 2.0 | |
389 | */ | |
390 | protected boolean executorIsTerminated() { | |
391 | return fExecutor.isTerminated(); | |
392 | } | |
393 | ||
394 | // ------------------------------------------------------------------------ | |
395 | // Signal handlers | |
396 | // ------------------------------------------------------------------------ | |
397 | ||
398 | /** | |
399 | * Handler for the start synch signal | |
400 | * | |
401 | * @param signal | |
402 | * Incoming signal | |
403 | */ | |
404 | @TmfSignalHandler | |
405 | public void startSynch(TmfStartSynchSignal signal) { | |
406 | synchronized (fLock) { | |
407 | fSignalDepth++; | |
408 | } | |
409 | } | |
410 | ||
411 | /** | |
412 | * Handler for the end synch signal | |
413 | * | |
414 | * @param signal | |
415 | * Incoming signal | |
416 | */ | |
417 | @TmfSignalHandler | |
418 | public void endSynch(TmfEndSynchSignal signal) { | |
419 | synchronized (fLock) { | |
420 | fSignalDepth--; | |
421 | if (fSignalDepth == 0) { | |
422 | fireRequest(); | |
423 | } | |
5419a136 AM |
424 | } |
425 | } | |
951d134a | 426 | |
8c8bf09f | 427 | } |