1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
16 package org
.eclipse
.tracecompass
.tmf
.core
.component
;
18 import java
.util
.ArrayList
;
19 import java
.util
.Collections
;
20 import java
.util
.Iterator
;
21 import java
.util
.LinkedList
;
22 import java
.util
.List
;
23 import java
.util
.Timer
;
24 import java
.util
.TimerTask
;
26 import org
.eclipse
.jdt
.annotation
.NonNull
;
27 import org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
;
28 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.TmfCoreTracer
;
29 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfEventThread
;
30 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfProviderManager
;
31 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfCoalescedEventRequest
;
32 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
33 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
34 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
;
35 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
.ExecutionType
;
36 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfEndSynchSignal
;
37 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfSignalHandler
;
38 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfStartSynchSignal
;
39 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.ITmfTimestamp
;
40 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfContext
;
43 * An abstract base class that implements ITmfEventProvider.
45 * This abstract class implements the housekeeping methods to register/
46 * de-register the event provider and to handle generically the event requests.
49 * @author Francois Chouinard
52 public abstract class TmfEventProvider
extends TmfComponent
implements ITmfEventProvider
{
54 // ------------------------------------------------------------------------
56 // ------------------------------------------------------------------------
58 /** Default amount of events per request "chunk"
60 public static final int DEFAULT_BLOCK_SIZE
= 50000;
62 /** Delay for coalescing background requests (in milli-seconds) */
63 private static final long DELAY
= 1000;
65 // ------------------------------------------------------------------------
67 // ------------------------------------------------------------------------
69 /** List of coalesced requests */
70 private final List
<TmfCoalescedEventRequest
> fPendingCoalescedRequests
= new LinkedList
<>();
72 /** The type of event handled by this provider */
73 private Class
<?
extends ITmfEvent
> fType
;
75 private final TmfRequestExecutor fExecutor
;
77 private final Object fLock
= new Object();
79 private int fSignalDepth
= 0;
81 private int fRequestPendingCounter
= 0;
85 /** Current timer task */
86 @NonNull private TimerTask fCurrentTask
= new TimerTask() { @Override public void run() {} };
88 private boolean fIsTimerEnabled
;
91 * The parent event provider.
93 private TmfEventProvider fParent
= null;
95 * The list if children event provider.
97 private final List
<TmfEventProvider
> fChildren
= Collections
.synchronizedList(new ArrayList
<TmfEventProvider
>());
99 // ------------------------------------------------------------------------
101 // ------------------------------------------------------------------------
104 * Default constructor
106 public TmfEventProvider() {
108 setTimerEnabled(true);
109 fExecutor
= new TmfRequestExecutor();
113 * Standard constructor. Instantiate and initialize at the same time.
116 * Name of the provider
118 * The type of events that will be handled
120 public TmfEventProvider(String name
, Class
<?
extends ITmfEvent
> type
) {
126 * Initialize this data provider
129 * Name of the provider
131 * The type of events that will be handled
133 public void init(String name
, Class
<?
extends ITmfEvent
> type
) {
140 synchronized (fLock
) {
141 fTimer
= new Timer();
144 TmfProviderManager
.register(fType
, this);
148 public void dispose() {
149 TmfProviderManager
.deregister(fType
, this);
151 synchronized (fLock
) {
152 if (fTimer
!= null) {
158 synchronized (fChildren
) {
159 for (TmfEventProvider child
: fChildren
) {
164 clearPendingRequests();
168 // ------------------------------------------------------------------------
170 // ------------------------------------------------------------------------
173 * Get the event type this provider handles
175 * @return The type of ITmfEvent
177 public Class
<?
extends ITmfEvent
> getType() {
181 // ------------------------------------------------------------------------
182 // ITmfRequestHandler
183 // ------------------------------------------------------------------------
189 public void sendRequest(final ITmfEventRequest request
) {
190 synchronized (fLock
) {
192 if (TmfCoreTracer
.isRequestTraced()) {
193 TmfCoreTracer
.traceRequest(request
.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
196 if (request
.getEventProvider() == null) {
197 request
.setEventProvider(this);
200 if (sendWithParent(request
)) {
204 if (request
.getExecType() == ExecutionType
.FOREGROUND
) {
205 if ((fSignalDepth
> 0) || (fRequestPendingCounter
> 0)) {
206 coalesceEventRequest(request
);
208 queueRequest(request
);
214 * Dispatch request in case timer is not running.
216 if (fTimer
== null) {
217 queueRequest(request
);
221 coalesceEventRequest(request
);
223 if (fIsTimerEnabled
) {
224 fCurrentTask
.cancel();
225 fCurrentTask
= new TimerTask() {
228 synchronized (fLock
) {
233 fTimer
.schedule(fCurrentTask
, DELAY
);
238 private void fireRequest(boolean isTimeout
) {
239 synchronized (fLock
) {
240 if (fRequestPendingCounter
> 0) {
244 if (fPendingCoalescedRequests
.size() > 0) {
245 Iterator
<TmfCoalescedEventRequest
> iter
= fPendingCoalescedRequests
.iterator();
246 while (iter
.hasNext()) {
247 ExecutionType type
= (isTimeout ? ExecutionType
.BACKGROUND
: ExecutionType
.FOREGROUND
);
248 ITmfEventRequest request
= iter
.next();
249 if (type
== request
.getExecType()) {
250 queueRequest(request
);
259 * Increments/decrements the pending requests counters and fires the request
260 * if necessary (counter == 0). Used for coalescing requests across multiple
264 * Should we increment (true) or decrement (false) the pending
268 public void notifyPendingRequest(boolean isIncrement
) {
269 synchronized (fLock
) {
271 fRequestPendingCounter
++;
273 if (fRequestPendingCounter
> 0) {
274 fRequestPendingCounter
--;
277 // fire request if all pending requests are received
278 if (fRequestPendingCounter
== 0) {
286 // ------------------------------------------------------------------------
288 // ------------------------------------------------------------------------
291 * Create a new request from an existing one, and add it to the coalesced
295 * The request to copy
298 protected void newCoalescedEventRequest(ITmfEventRequest request
) {
299 synchronized (fLock
) {
300 TmfCoalescedEventRequest coalescedRequest
= new TmfCoalescedEventRequest(
301 request
.getDataType(),
304 request
.getNbRequested(),
305 request
.getExecType());
306 coalescedRequest
.addRequest(request
);
307 coalescedRequest
.setEventProvider(this);
308 if (TmfCoreTracer
.isRequestTraced()) {
309 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
310 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
312 coalesceChildrenRequests(coalescedRequest
);
313 fPendingCoalescedRequests
.add(coalescedRequest
);
318 * Add an existing requests to the list of coalesced ones
321 * The request to add to the list
324 protected void coalesceEventRequest(ITmfEventRequest request
) {
325 synchronized (fLock
) {
326 for (TmfCoalescedEventRequest coalescedRequest
: getPendingRequests()) {
327 if (coalescedRequest
.isCompatible(request
)) {
328 coalescedRequest
.addRequest(request
);
329 if (TmfCoreTracer
.isRequestTraced()) {
330 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
331 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
333 coalesceChildrenRequests(coalescedRequest
);
337 newCoalescedEventRequest(request
);
342 * Sends a request with the parent if compatible.
344 private boolean sendWithParent(final ITmfEventRequest request
) {
345 ITmfEventProvider parent
= getParent();
346 if (parent
instanceof TmfEventProvider
) {
347 return ((TmfEventProvider
) parent
).sendIfCompatible(request
);
353 * Sends a request if compatible with a pending coalesced request.
355 private boolean sendIfCompatible(ITmfEventRequest request
) {
356 synchronized (fLock
) {
357 for (TmfCoalescedEventRequest coalescedRequest
: getPendingRequests()) {
358 if (coalescedRequest
.isCompatible(request
)) {
359 // Send so it can be coalesced with the parent(s)
360 sendRequest(request
);
365 return sendWithParent(request
);
369 * Coalesces children requests with given request if compatible.
371 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request
) {
372 synchronized (fChildren
) {
373 for (TmfEventProvider child
: fChildren
) {
374 child
.coalesceCompatibleRequests(request
);
381 * Coalesces all pending requests that are compatible with coalesced request.
383 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request
) {
384 Iterator
<TmfCoalescedEventRequest
> iter
= getPendingRequests().iterator();
385 while (iter
.hasNext()) {
386 TmfCoalescedEventRequest pendingRequest
= iter
.next();
387 if (request
.isCompatible(pendingRequest
)) {
388 request
.addRequest(pendingRequest
);
389 if (TmfCoreTracer
.isRequestTraced()) {
390 TmfCoreTracer
.traceRequest(pendingRequest
.getRequestId(), "COALESCED with " + request
.getRequestId()); //$NON-NLS-1$
391 TmfCoreTracer
.traceRequest(request
.getRequestId(), "now contains " + request
.getSubRequestIds()); //$NON-NLS-1$
398 // ------------------------------------------------------------------------
399 // Request processing
400 // ------------------------------------------------------------------------
409 protected void queueRequest(final ITmfEventRequest request
) {
411 if (fExecutor
.isShutdown()) {
416 TmfEventThread thread
= new TmfEventThread(this, request
);
418 if (TmfCoreTracer
.isRequestTraced()) {
419 TmfCoreTracer
.traceRequest(request
.getRequestId(), "QUEUED"); //$NON-NLS-1$
422 fExecutor
.execute(thread
);
426 * Initialize the provider based on the request. The context is provider
427 * specific and will be updated by getNext().
431 * @return An application specific context; null if request can't be
435 public abstract ITmfContext
armRequest(ITmfEventRequest request
);
438 * Checks if the data meets the request completion criteria.
445 * The number of events read so far
446 * @return true if completion criteria is met
449 public boolean isCompleted(ITmfEventRequest request
, ITmfEvent event
, int nbRead
) {
450 boolean requestCompleted
= isCompleted2(request
, nbRead
);
451 if (!requestCompleted
) {
452 ITmfTimestamp endTime
= request
.getRange().getEndTime();
453 return event
.getTimestamp().compareTo(endTime
) > 0;
455 return requestCompleted
;
458 private static boolean isCompleted2(ITmfEventRequest request
,int nbRead
) {
459 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
462 // ------------------------------------------------------------------------
463 // Pass-through's to the request executor
464 // ------------------------------------------------------------------------
467 * @return the shutdown state (i.e. if it is accepting new requests)
470 protected boolean executorIsShutdown() {
471 return fExecutor
.isShutdown();
475 * @return the termination state
478 protected boolean executorIsTerminated() {
479 return fExecutor
.isTerminated();
482 // ------------------------------------------------------------------------
484 // ------------------------------------------------------------------------
487 * Handler for the start synch signal
493 public void startSynch(TmfStartSynchSignal signal
) {
494 synchronized (fLock
) {
500 * Handler for the end synch signal
506 public void endSynch(TmfEndSynchSignal signal
) {
507 synchronized (fLock
) {
509 if (fSignalDepth
== 0) {
516 public ITmfEventProvider
getParent() {
517 synchronized (fLock
) {
523 public void setParent(ITmfEventProvider parent
) {
524 if (!(parent
instanceof TmfEventProvider
)) {
525 throw new IllegalArgumentException();
528 synchronized (fLock
) {
529 fParent
= (TmfEventProvider
) parent
;
534 public List
<ITmfEventProvider
> getChildren() {
535 synchronized (fChildren
) {
536 List
<ITmfEventProvider
> list
= new ArrayList
<>();
537 list
.addAll(fChildren
);
543 public <T
extends ITmfEventProvider
> List
<T
> getChildren(Class
<T
> clazz
) {
544 List
<T
> list
= new ArrayList
<>();
545 synchronized (fChildren
) {
546 for (TmfEventProvider child
: fChildren
) {
547 if (clazz
.isAssignableFrom(child
.getClass())) {
548 list
.add(clazz
.cast(child
));
556 public ITmfEventProvider
getChild(String name
) {
557 synchronized (fChildren
) {
558 for (TmfEventProvider child
: fChildren
) {
559 if (child
.getName().equals(name
)) {
568 public ITmfEventProvider
getChild(int index
) {
569 return NonNullUtils
.checkNotNull(fChildren
.get(index
));
573 public void addChild(ITmfEventProvider child
) {
574 if (!(child
instanceof TmfEventProvider
)) {
575 throw new IllegalArgumentException();
577 child
.setParent(this);
578 fChildren
.add((TmfEventProvider
)child
);
582 public int getNbChildren() {
583 return fChildren
.size();
587 public boolean providesEvent(ITmfEvent event
) {
588 if ((event
.getTrace() == this)) {
591 if (fChildren
.size() > 0) {
592 synchronized (fLock
) {
593 List
<TmfEventProvider
> children
= getChildren(TmfEventProvider
.class);
594 for (TmfEventProvider child
: children
) {
595 if (child
.providesEvent(event
)) {
604 // ------------------------------------------------------------------------
605 // Debug code (will also used in tests using reflection)
606 // ------------------------------------------------------------------------
609 * Gets a list of all pending requests. Debug code.
611 * @return a list of all pending requests
613 private List
<TmfCoalescedEventRequest
> getPendingRequests() {
614 return fPendingCoalescedRequests
;
618 * Clears all pending requests. Debug code.
620 private void clearPendingRequests() {
621 fPendingCoalescedRequests
.clear();
625 * Enables/disables the timer. Debug code.
628 * the enable flag to set
630 private void setTimerEnabled(Boolean enabled
) {
631 fIsTimerEnabled
= enabled
;