1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Alexandre Montplaisir - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.tracecompass
.tmf
.core
.statesystem
;
15 import org
.eclipse
.jdt
.annotation
.NonNull
;
16 import org
.eclipse
.jdt
.annotation
.Nullable
;
17 import org
.eclipse
.tracecompass
.common
.core
.collect
.BufferedBlockingQueue
;
18 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.Activator
;
19 import org
.eclipse
.tracecompass
.statesystem
.core
.ITmfStateSystem
;
20 import org
.eclipse
.tracecompass
.statesystem
.core
.ITmfStateSystemBuilder
;
21 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
22 import org
.eclipse
.tracecompass
.tmf
.core
.event
.TmfEvent
;
23 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfContext
;
24 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfTrace
;
27 * Instead of using IStateChangeInput directly, one can extend this class, which
28 * defines a lot of the common functions of the state change input plugin.
30 * It will handle the state-system-processing in a separate thread, which is
31 * normally not a bad idea for traces of some size.
33 * processEvent() is replaced with eventHandle(), so that all the multi-thread
34 * logic is abstracted away.
36 * @author Alexandre Montplaisir
38 public abstract class AbstractTmfStateProvider
implements ITmfStateProvider
{
40 private static final int DEFAULT_EVENTS_QUEUE_SIZE
= 127;
41 private static final int DEFAULT_EVENTS_CHUNK_SIZE
= 127;
43 private final ITmfTrace fTrace
;
44 private final BufferedBlockingQueue
<ITmfEvent
> fEventsQueue
;
45 private final Thread fEventHandlerThread
;
47 private boolean fStateSystemAssigned
;
48 /** State system in which to insert the state changes */
49 private @Nullable ITmfStateSystemBuilder fSS
= null;
51 /* The last safe time at which this state provider can be queried */
52 private volatile long fSafeTime
;
55 * Instantiate a new state provider plugin.
58 * The LTTng 2.0 kernel trace directory
60 * Name given to this state change input. Only used internally.
62 public AbstractTmfStateProvider(ITmfTrace trace
, String id
) {
64 fEventsQueue
= new BufferedBlockingQueue
<>(DEFAULT_EVENTS_QUEUE_SIZE
, DEFAULT_EVENTS_CHUNK_SIZE
);
65 fStateSystemAssigned
= false;
66 // set the safe time to before the trace start, the analysis has not yet started
67 fSafeTime
= trace
.getStartTime().toNanos() - 1;
69 fEventHandlerThread
= new Thread(new EventProcessor(), id
+ " Event Handler"); //$NON-NLS-1$
73 * Get the state system builder of this provider (to insert states in).
75 * @return The state system object to be filled
77 protected @Nullable ITmfStateSystemBuilder
getStateSystemBuilder() {
82 public ITmfTrace
getTrace() {
87 public long getStartTime() {
88 return fTrace
.getStartTime().toNanos();
95 public long getLatestSafeTime() {
100 public void assignTargetStateSystem(ITmfStateSystemBuilder ssb
) {
102 fStateSystemAssigned
= true;
103 fEventHandlerThread
.start();
107 public @Nullable ITmfStateSystem
getAssignedStateSystem() {
112 public void dispose() {
113 /* Insert a null event in the queue to stop the event handler's thread. */
115 fEventsQueue
.put(END_EVENT
);
116 fEventsQueue
.flushInputBuffer();
117 fEventHandlerThread
.join();
118 } catch (InterruptedException e
) {
121 fStateSystemAssigned
= false;
126 public final void processEvent(ITmfEvent event
) {
127 /* Make sure the target state system has been assigned */
128 if (!fStateSystemAssigned
) {
129 Activator
.logError("Cannot process event without a target state system"); //$NON-NLS-1$
133 /* Insert the event we're received into the events queue */
134 ITmfEvent curEvent
= event
;
135 fEventsQueue
.put(curEvent
);
139 * Block the caller until the events queue is empty.
141 public void waitForEmptyQueue() {
143 * We will first insert a dummy event that is guaranteed to not modify
144 * the state. That way, when that event leaves the queue, we will know
145 * for sure that the state system processed the preceding real event.
148 fEventsQueue
.put(EMPTY_QUEUE_EVENT
);
149 fEventsQueue
.flushInputBuffer();
150 while (!fEventsQueue
.isEmpty()) {
153 } catch (InterruptedException e
) {
158 // ------------------------------------------------------------------------
159 // Special event types
160 // ------------------------------------------------------------------------
162 /** Fake event indicating the build is over, and the provider should close */
163 private static class EndEvent
extends TmfEvent
{
165 super(null, ITmfContext
.UNKNOWN_RANK
, null, null, null);
169 /** Fake event indicating we want to clear the current queue */
170 private static class EmptyQueueEvent
extends TmfEvent
{
171 public EmptyQueueEvent() {
172 super(null, ITmfContext
.UNKNOWN_RANK
, null, null, null);
176 private static final EndEvent END_EVENT
= new EndEvent();
177 private static final EmptyQueueEvent EMPTY_QUEUE_EVENT
= new EmptyQueueEvent();
179 // ------------------------------------------------------------------------
181 // ------------------------------------------------------------------------
184 * This is the runner class for the second thread, which will take the
185 * events from the queue and pass them through the state system.
187 private class EventProcessor
implements Runnable
{
189 private @Nullable ITmfEvent currentEvent
;
193 if (!fStateSystemAssigned
) {
194 Activator
.logError("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
200 * We never insert null in the queue. Cannot be checked at
201 * compile-time until Java 8 annotations...
203 @NonNull ITmfEvent event
= fEventsQueue
.take();
204 /* This is a singleton, we want to do != instead of !x.equals */
205 while (event
!= END_EVENT
) {
206 if (event
== EMPTY_QUEUE_EVENT
) {
207 /* Synchronization event, should be ignored */
208 event
= fEventsQueue
.take();
211 currentEvent
= event
;
212 fSafeTime
= event
.getTimestamp().toNanos() - 1;
214 event
= fEventsQueue
.take();
216 /* We've received the last event, clean up */
220 private void closeStateSystem() {
221 ITmfEvent event
= currentEvent
;
222 final long endTime
= (event
== null) ?
0 :
223 event
.getTimestamp().toNanos();
226 fSS
.closeHistory(endTime
);
231 // ------------------------------------------------------------------------
233 // ------------------------------------------------------------------------
236 * Handle the given event and send the appropriate state transitions into
237 * the the state system.
239 * This is basically the same thing as IStateChangeInput.processEvent(),
240 * except here processEvent() and eventHandle() are run in two different
241 * threads (and the AbstractStateChangeInput takes care of processEvent()
245 * The event to process. If you need a specific event type, you
246 * should check for its instance right at the beginning.
248 protected abstract void eventHandle(ITmfEvent event
);