Commit | Line | Data |
---|---|---|
8c8bf09f ASL |
1 | /******************************************************************************* |
2 | * Copyright (c) 2009, 2010 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Francois Chouinard - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
6c13869b | 13 | package org.eclipse.linuxtools.tmf.core.component; |
8c8bf09f | 14 | |
8c8bf09f ASL |
15 | import java.util.Vector; |
16 | import java.util.concurrent.BlockingQueue; | |
17 | import java.util.concurrent.LinkedBlockingQueue; | |
9b635e61 | 18 | import java.util.concurrent.SynchronousQueue; |
8c8bf09f | 19 | |
4918b8f2 | 20 | import org.eclipse.linuxtools.internal.tmf.core.Tracer; |
8fd82db5 FC |
21 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager; |
22 | import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread; | |
23 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest; | |
24 | import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor; | |
72f1e62a | 25 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; |
6c13869b | 26 | import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest; |
4c564a2d | 27 | import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType; |
6c13869b | 28 | import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest; |
6c13869b FC |
29 | import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal; |
30 | import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler; | |
31 | import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal; | |
32 | import org.eclipse.linuxtools.tmf.core.trace.ITmfContext; | |
8c8bf09f ASL |
33 | |
34 | /** | |
8fd82db5 | 35 | * An abstract base class that implements ITmfDataProvider. |
8c8bf09f | 36 | * <p> |
8fd82db5 FC |
37 | * This abstract class implements the housekeeping methods to register/ |
38 | * de-register the event provider and to handle generically the event requests. | |
8c8bf09f | 39 | * <p> |
8fd82db5 FC |
40 | * The concrete class can either re-implement processRequest() entirely or just |
41 | * implement the hooks (initializeContext() and getNext()). | |
8c8bf09f ASL |
42 | * <p> |
43 | * TODO: Add support for providing multiple data types. | |
8fd82db5 FC |
44 | * |
45 | * @version 1.0 | |
46 | * @author Francois Chouinard | |
8c8bf09f | 47 | */ |
72f1e62a | 48 | public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> { |
8c8bf09f | 49 | |
948b0607 FC |
50 | // ------------------------------------------------------------------------ |
51 | // Constants | |
52 | // ------------------------------------------------------------------------ | |
550d787e | 53 | |
00641a97 FC |
54 | public static final int DEFAULT_BLOCK_SIZE = 50000; |
55 | public static final int DEFAULT_QUEUE_SIZE = 1000; | |
56 | ||
948b0607 | 57 | // ------------------------------------------------------------------------ |
12c155f5 | 58 | // Attributes |
948b0607 | 59 | // ------------------------------------------------------------------------ |
8c8bf09f | 60 | |
12c155f5 FC |
61 | protected Class<T> fType; |
62 | protected boolean fLogData; | |
63 | protected boolean fLogError; | |
f6b14ce2 | 64 | |
00641a97 | 65 | protected int fQueueSize = DEFAULT_QUEUE_SIZE; |
12c155f5 FC |
66 | protected BlockingQueue<T> fDataQueue; |
67 | protected TmfRequestExecutor fExecutor; | |
948b0607 FC |
68 | |
69 | private int fSignalDepth = 0; | |
045df77d | 70 | private final Object fLock = new Object(); |
951d134a | 71 | |
c1c69938 FC |
72 | private int fRequestPendingCounter = 0; |
73 | ||
948b0607 FC |
74 | // ------------------------------------------------------------------------ |
75 | // Constructors | |
76 | // ------------------------------------------------------------------------ | |
77 | ||
12c155f5 | 78 | public TmfDataProvider() { |
00641a97 | 79 | super(); |
12c155f5 FC |
80 | fQueueSize = DEFAULT_QUEUE_SIZE; |
81 | fDataQueue = new LinkedBlockingQueue<T>(fQueueSize); | |
82 | fExecutor = new TmfRequestExecutor(); | |
83 | } | |
84 | ||
3791b5df | 85 | public void init(String name, Class<T> type) { |
12c155f5 | 86 | super.init(name); |
3791b5df | 87 | fType = type; |
12c155f5 FC |
88 | fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>(); |
89 | ||
90 | fExecutor = new TmfRequestExecutor(); | |
91 | fSignalDepth = 0; | |
92 | ||
93 | fLogData = Tracer.isEventTraced(); | |
94 | fLogError = Tracer.isErrorTraced(); | |
95 | ||
96 | TmfProviderManager.register(fType, this); | |
97 | } | |
98 | ||
948b0607 | 99 | protected TmfDataProvider(String name, Class<T> type, int queueSize) { |
00641a97 | 100 | this(); |
948b0607 | 101 | fQueueSize = queueSize; |
00641a97 | 102 | init(name, type); |
948b0607 FC |
103 | } |
104 | ||
105 | public TmfDataProvider(TmfDataProvider<T> other) { | |
00641a97 FC |
106 | this(); |
107 | init(other.getName(), other.fType); | |
108 | } | |
550d787e | 109 | |
00641a97 FC |
110 | public TmfDataProvider(String name, Class<T> type) { |
111 | this(name, type, DEFAULT_QUEUE_SIZE); | |
948b0607 FC |
112 | } |
113 | ||
114 | @Override | |
115 | public void dispose() { | |
116 | TmfProviderManager.deregister(fType, this); | |
117 | fExecutor.stop(); | |
118 | super.dispose(); | |
12c155f5 | 119 | // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped"); |
948b0607 FC |
120 | } |
121 | ||
00641a97 FC |
122 | // ------------------------------------------------------------------------ |
123 | // Accessors | |
124 | // ------------------------------------------------------------------------ | |
125 | ||
948b0607 FC |
126 | public int getQueueSize() { |
127 | return fQueueSize; | |
128 | } | |
129 | ||
130 | public Class<?> getType() { | |
131 | return fType; | |
132 | } | |
133 | ||
134 | // ------------------------------------------------------------------------ | |
135 | // ITmfRequestHandler | |
136 | // ------------------------------------------------------------------------ | |
137 | ||
138 | @Override | |
139 | public void sendRequest(final ITmfDataRequest<T> request) { | |
140 | synchronized (fLock) { | |
141 | if (fSignalDepth > 0) { | |
142 | coalesceDataRequest(request); | |
143 | } else { | |
144 | dispatchRequest(request); | |
145 | } | |
146 | } | |
147 | } | |
148 | ||
149 | /** | |
150 | * This method queues the coalesced requests. | |
948b0607 FC |
151 | */ |
152 | @Override | |
c1c69938 | 153 | public void fireRequest() { |
948b0607 FC |
154 | synchronized (fLock) { |
155 | if (fRequestPendingCounter > 0) { | |
156 | return; | |
157 | } | |
158 | if (fPendingCoalescedRequests.size() > 0) { | |
159 | for (TmfDataRequest<T> request : fPendingCoalescedRequests) { | |
160 | dispatchRequest(request); | |
161 | } | |
162 | fPendingCoalescedRequests.clear(); | |
163 | } | |
164 | } | |
165 | } | |
166 | ||
167 | /** | |
12c155f5 FC |
168 | * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for |
169 | * coalescing requests accross multiple TmfDataProvider. | |
948b0607 FC |
170 | * |
171 | * @param isIncrement | |
172 | */ | |
c1c69938 FC |
173 | @Override |
174 | public void notifyPendingRequest(boolean isIncrement) { | |
948b0607 | 175 | synchronized (fLock) { |
c1c69938 FC |
176 | if (isIncrement) { |
177 | if (fSignalDepth > 0) { | |
178 | fRequestPendingCounter++; | |
179 | } | |
180 | } else { | |
181 | if (fRequestPendingCounter > 0) { | |
182 | fRequestPendingCounter--; | |
183 | } | |
948b0607 | 184 | |
c1c69938 FC |
185 | // fire request if all pending requests are received |
186 | if (fRequestPendingCounter == 0) { | |
187 | fireRequest(); | |
188 | } | |
189 | } | |
190 | } | |
948b0607 FC |
191 | } |
192 | ||
193 | // ------------------------------------------------------------------------ | |
194 | // Coalescing (primitive test...) | |
195 | // ------------------------------------------------------------------------ | |
196 | ||
197 | protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>(); | |
198 | ||
199 | protected void newCoalescedDataRequest(ITmfDataRequest<T> request) { | |
200 | synchronized (fLock) { | |
1b70b6dc | 201 | TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(), |
12c155f5 | 202 | request.getNbRequested(), request.getBlockSize(), request.getExecType()); |
948b0607 FC |
203 | coalescedRequest.addRequest(request); |
204 | if (Tracer.isRequestTraced()) { | |
90891c08 FC |
205 | Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ |
206 | Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
948b0607 FC |
207 | } |
208 | fPendingCoalescedRequests.add(coalescedRequest); | |
209 | } | |
210 | } | |
211 | ||
212 | protected void coalesceDataRequest(ITmfDataRequest<T> request) { | |
213 | synchronized (fLock) { | |
214 | for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) { | |
215 | if (coalescedRequest.isCompatible(request)) { | |
216 | coalescedRequest.addRequest(request); | |
217 | if (Tracer.isRequestTraced()) { | |
90891c08 FC |
218 | Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$ |
219 | Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$ | |
948b0607 FC |
220 | } |
221 | return; | |
222 | } | |
223 | } | |
224 | newCoalescedDataRequest(request); | |
225 | } | |
226 | } | |
227 | ||
228 | // ------------------------------------------------------------------------ | |
229 | // Request processing | |
230 | // ------------------------------------------------------------------------ | |
231 | ||
232 | private void dispatchRequest(final ITmfDataRequest<T> request) { | |
233 | if (request.getExecType() == ExecutionType.FOREGROUND) | |
234 | queueRequest(request); | |
235 | else | |
236 | queueBackgroundRequest(request, request.getBlockSize(), true); | |
237 | } | |
238 | ||
239 | protected void queueRequest(final ITmfDataRequest<T> request) { | |
240 | ||
241 | if (fExecutor.isShutdown()) { | |
242 | request.cancel(); | |
243 | return; | |
244 | } | |
245 | ||
246 | final TmfDataProvider<T> provider = this; | |
247 | ||
248 | // Process the request | |
249 | TmfThread thread = new TmfThread(request.getExecType()) { | |
475743b7 | 250 | |
948b0607 FC |
251 | @Override |
252 | public void run() { | |
253 | ||
4cf201de | 254 | if (Tracer.isRequestTraced()) { |
90891c08 | 255 | Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$ |
4cf201de | 256 | } |
948b0607 FC |
257 | |
258 | // Extract the generic information | |
259 | request.start(); | |
260 | int nbRequested = request.getNbRequested(); | |
261 | int nbRead = 0; | |
262 | ||
263 | // Initialize the execution | |
264 | ITmfContext context = armRequest(request); | |
265 | if (context == null) { | |
266 | request.cancel(); | |
267 | return; | |
268 | } | |
269 | ||
270 | try { | |
271 | // Get the ordered events | |
272 | T data = getNext(context); | |
273 | if (Tracer.isRequestTraced()) | |
90891c08 | 274 | Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$ |
948b0607 | 275 | while (data != null && !isCompleted(request, data, nbRead)) { |
408e65d2 FC |
276 | if (fLogData) { |
277 | Tracer.traceEvent(provider, request, data); | |
278 | } | |
1b70b6dc PT |
279 | if (request.getDataType().isInstance(data)) { |
280 | request.handleData(data); | |
281 | } | |
948b0607 FC |
282 | |
283 | // To avoid an unnecessary read passed the last data | |
284 | // requested | |
285 | if (++nbRead < nbRequested) { | |
286 | data = getNext(context); | |
287 | } | |
288 | } | |
289 | if (Tracer.isRequestTraced()) | |
90891c08 | 290 | Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$ |
948b0607 FC |
291 | |
292 | if (request.isCancelled()) { | |
293 | request.cancel(); | |
294 | } else { | |
295 | request.done(); | |
296 | } | |
297 | } catch (Exception e) { | |
298 | request.fail(); | |
299 | } | |
300 | ||
301 | // Cleanup | |
302 | context.dispose(); | |
303 | } | |
475743b7 FC |
304 | |
305 | @Override | |
306 | public void cancel() { | |
8a0edc79 FC |
307 | if (!request.isCompleted()) { |
308 | request.cancel(); | |
475743b7 FC |
309 | } |
310 | } | |
948b0607 FC |
311 | }; |
312 | ||
313 | if (Tracer.isRequestTraced()) | |
90891c08 | 314 | Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$ |
948b0607 FC |
315 | fExecutor.execute(thread); |
316 | ||
317 | } | |
318 | ||
319 | protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) { | |
320 | ||
4cf201de FC |
321 | final TmfDataProvider<T> provider = this; |
322 | ||
948b0607 FC |
323 | Thread thread = new Thread() { |
324 | @Override | |
325 | public void run() { | |
4cf201de FC |
326 | |
327 | if (Tracer.isRequestTraced()) { | |
328 | Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$ | |
329 | } | |
330 | ||
948b0607 FC |
331 | request.start(); |
332 | ||
333 | final Integer[] CHUNK_SIZE = new Integer[1]; | |
334 | CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0)); | |
335 | ||
336 | final Integer[] nbRead = new Integer[1]; | |
337 | nbRead[0] = 0; | |
338 | ||
339 | final Boolean[] isFinished = new Boolean[1]; | |
340 | isFinished[0] = Boolean.FALSE; | |
341 | ||
342 | while (!isFinished[0]) { | |
343 | ||
12c155f5 FC |
344 | TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex() |
345 | + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) { | |
11a2fdf0 PT |
346 | |
347 | @Override | |
348 | public synchronized boolean isCompleted() { | |
349 | return super.isCompleted() || request.isCompleted(); | |
350 | } | |
351 | ||
948b0607 FC |
352 | @Override |
353 | public void handleData(T data) { | |
354 | super.handleData(data); | |
1b70b6dc PT |
355 | if (request.getDataType().isInstance(data)) { |
356 | request.handleData(data); | |
357 | } | |
948b0607 FC |
358 | if (getNbRead() > CHUNK_SIZE[0]) { |
359 | System.out.println("ERROR - Read too many events"); //$NON-NLS-1$ | |
360 | } | |
361 | } | |
362 | ||
363 | @Override | |
364 | public void handleCompleted() { | |
365 | nbRead[0] += getNbRead(); | |
366 | if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) { | |
90de83da | 367 | if (this.isCancelled()) { |
948b0607 | 368 | request.cancel(); |
90de83da | 369 | } else if (this.isFailed()) { |
12c155f5 | 370 | request.fail(); |
948b0607 FC |
371 | } else { |
372 | request.done(); | |
373 | } | |
374 | isFinished[0] = Boolean.TRUE; | |
375 | } | |
376 | super.handleCompleted(); | |
377 | } | |
378 | }; | |
379 | ||
380 | if (!isFinished[0]) { | |
381 | queueRequest(subRequest); | |
382 | ||
383 | try { | |
384 | subRequest.waitForCompletion(); | |
11a2fdf0 PT |
385 | if (request.isCompleted()) { |
386 | isFinished[0] = Boolean.TRUE; | |
387 | } | |
948b0607 FC |
388 | } catch (InterruptedException e) { |
389 | e.printStackTrace(); | |
390 | } | |
391 | ||
392 | CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize); | |
393 | } | |
394 | } | |
395 | } | |
396 | }; | |
397 | ||
398 | thread.start(); | |
399 | } | |
400 | ||
401 | /** | |
d337369a FC |
402 | * Initialize the provider based on the request. The context is provider |
403 | * specific and will be updated by getNext(). | |
948b0607 FC |
404 | * |
405 | * @param request | |
12c155f5 | 406 | * @return an application specific context; null if request can't be serviced |
948b0607 | 407 | */ |
9e0640dc | 408 | protected abstract ITmfContext armRequest(ITmfDataRequest<T> request); |
948b0607 | 409 | |
c32744d6 FC |
410 | // /** |
411 | // * Return the next event based on the context supplied. The context | |
412 | // * will be updated for the subsequent read. | |
413 | // * | |
414 | // * @param context the trace read context (updated) | |
415 | // * @return the event referred to by context | |
416 | // */ | |
417 | // public abstract T getNext(ITmfContext context); | |
948b0607 FC |
418 | |
419 | /** | |
420 | * Checks if the data meets the request completion criteria. | |
421 | * | |
0d9a6d76 FC |
422 | * @param request the request |
423 | * @param data the data to verify | |
424 | * @param nbRead the number of events read so far | |
425 | * @return true if completion criteria is met | |
948b0607 FC |
426 | */ |
427 | public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) { | |
b6be1c3e | 428 | return request.isCompleted() || nbRead >= request.getNbRequested(); |
948b0607 FC |
429 | } |
430 | ||
431 | // ------------------------------------------------------------------------ | |
432 | // Signal handlers | |
433 | // ------------------------------------------------------------------------ | |
434 | ||
435 | @TmfSignalHandler | |
436 | public void startSynch(TmfStartSynchSignal signal) { | |
437 | synchronized (fLock) { | |
438 | fSignalDepth++; | |
439 | } | |
440 | } | |
441 | ||
442 | @TmfSignalHandler | |
443 | public void endSynch(TmfEndSynchSignal signal) { | |
045df77d | 444 | synchronized (fLock) { |
948b0607 FC |
445 | fSignalDepth--; |
446 | if (fSignalDepth == 0) { | |
447 | fireRequest(); | |
448 | } | |
045df77d | 449 | } |
948b0607 | 450 | } |
8c8bf09f ASL |
451 | |
452 | } |