tmf: Add an attribute pool for state system analyses
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / statesystem / AbstractTmfStateProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Alexandre Montplaisir - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.tracecompass.tmf.core.statesystem;
14
15 import org.eclipse.jdt.annotation.NonNull;
16 import org.eclipse.jdt.annotation.Nullable;
17 import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
18 import org.eclipse.tracecompass.internal.tmf.core.Activator;
19 import org.eclipse.tracecompass.statesystem.core.ITmfStateSystem;
20 import org.eclipse.tracecompass.statesystem.core.ITmfStateSystemBuilder;
21 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
22 import org.eclipse.tracecompass.tmf.core.event.TmfEvent;
23 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
24 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
25
26 /**
27 * Instead of using IStateChangeInput directly, one can extend this class, which
28 * defines a lot of the common functions of the state change input plugin.
29 *
30 * It will handle the state-system-processing in a separate thread, which is
31 * normally not a bad idea for traces of some size.
32 *
33 * processEvent() is replaced with eventHandle(), so that all the multi-thread
34 * logic is abstracted away.
35 *
36 * @author Alexandre Montplaisir
37 */
38 public abstract class AbstractTmfStateProvider implements ITmfStateProvider {
39
40 private static final int DEFAULT_EVENTS_QUEUE_SIZE = 127;
41 private static final int DEFAULT_EVENTS_CHUNK_SIZE = 127;
42
43 private final ITmfTrace fTrace;
44 private final BufferedBlockingQueue<ITmfEvent> fEventsQueue;
45 private final Thread fEventHandlerThread;
46
47 private boolean fStateSystemAssigned;
48
49 /** State system in which to insert the state changes */
50 private @Nullable ITmfStateSystemBuilder fSS = null;
51
52 /**
53 * Instantiate a new state provider plugin.
54 *
55 * @param trace
56 * The LTTng 2.0 kernel trace directory
57 * @param id
58 * Name given to this state change input. Only used internally.
59 */
60 public AbstractTmfStateProvider(ITmfTrace trace, String id) {
61 fTrace = trace;
62 fEventsQueue = new BufferedBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE, DEFAULT_EVENTS_CHUNK_SIZE);
63 fStateSystemAssigned = false;
64
65 fEventHandlerThread = new Thread(new EventProcessor(), id + " Event Handler"); //$NON-NLS-1$
66 }
67
68 /**
69 * Get the state system builder of this provider (to insert states in).
70 *
71 * @return The state system object to be filled
72 */
73 protected @Nullable ITmfStateSystemBuilder getStateSystemBuilder() {
74 return fSS;
75 }
76
77 @Override
78 public ITmfTrace getTrace() {
79 return fTrace;
80 }
81
82 @Override
83 public long getStartTime() {
84 return fTrace.getStartTime().toNanos();
85 }
86
87 @Override
88 public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) {
89 fSS = ssb;
90 fStateSystemAssigned = true;
91 fEventHandlerThread.start();
92 }
93
94 @Override
95 public @Nullable ITmfStateSystem getAssignedStateSystem() {
96 return fSS;
97 }
98
99 @Override
100 public void dispose() {
101 /* Insert a null event in the queue to stop the event handler's thread. */
102 try {
103 fEventsQueue.put(END_EVENT);
104 fEventsQueue.flushInputBuffer();
105 fEventHandlerThread.join();
106 } catch (InterruptedException e) {
107 e.printStackTrace();
108 }
109 fStateSystemAssigned = false;
110 fSS = null;
111 }
112
113 @Override
114 public final void processEvent(ITmfEvent event) {
115 /* Make sure the target state system has been assigned */
116 if (!fStateSystemAssigned) {
117 Activator.logError("Cannot process event without a target state system"); //$NON-NLS-1$
118 return;
119 }
120
121 /* Insert the event we're received into the events queue */
122 ITmfEvent curEvent = event;
123 fEventsQueue.put(curEvent);
124 }
125
126 /**
127 * Block the caller until the events queue is empty.
128 */
129 public void waitForEmptyQueue() {
130 /*
131 * We will first insert a dummy event that is guaranteed to not modify
132 * the state. That way, when that event leaves the queue, we will know
133 * for sure that the state system processed the preceding real event.
134 */
135 try {
136 fEventsQueue.put(EMPTY_QUEUE_EVENT);
137 fEventsQueue.flushInputBuffer();
138 while (!fEventsQueue.isEmpty()) {
139 Thread.sleep(100);
140 }
141 } catch (InterruptedException e) {
142 e.printStackTrace();
143 }
144 }
145
146 // ------------------------------------------------------------------------
147 // Special event types
148 // ------------------------------------------------------------------------
149
150 /** Fake event indicating the build is over, and the provider should close */
151 private static class EndEvent extends TmfEvent {
152 public EndEvent() {
153 super(null, ITmfContext.UNKNOWN_RANK, null, null, null);
154 }
155 }
156
157 /** Fake event indicating we want to clear the current queue */
158 private static class EmptyQueueEvent extends TmfEvent {
159 public EmptyQueueEvent() {
160 super(null, ITmfContext.UNKNOWN_RANK, null, null, null);
161 }
162 }
163
164 private static final EndEvent END_EVENT = new EndEvent();
165 private static final EmptyQueueEvent EMPTY_QUEUE_EVENT = new EmptyQueueEvent();
166
167 // ------------------------------------------------------------------------
168 // Inner classes
169 // ------------------------------------------------------------------------
170
171 /**
172 * This is the runner class for the second thread, which will take the
173 * events from the queue and pass them through the state system.
174 */
175 private class EventProcessor implements Runnable {
176
177 private @Nullable ITmfEvent currentEvent;
178
179 @Override
180 public void run() {
181 if (!fStateSystemAssigned) {
182 Activator.logError("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
183 return;
184 }
185
186
187 /*
188 * We never insert null in the queue. Cannot be checked at
189 * compile-time until Java 8 annotations...
190 */
191 @NonNull ITmfEvent event = fEventsQueue.take();
192 /* This is a singleton, we want to do != instead of !x.equals */
193 while (event != END_EVENT) {
194 if (event == EMPTY_QUEUE_EVENT) {
195 /* Synchronization event, should be ignored */
196 event = fEventsQueue.take();
197 continue;
198 }
199 currentEvent = event;
200 eventHandle(event);
201 event = fEventsQueue.take();
202 }
203 /* We've received the last event, clean up */
204 closeStateSystem();
205 }
206
207 private void closeStateSystem() {
208 ITmfEvent event = currentEvent;
209 final long endTime = (event == null) ? 0 :
210 event.getTimestamp().toNanos();
211
212 if (fSS != null) {
213 fSS.closeHistory(endTime);
214 }
215 }
216 }
217
218 // ------------------------------------------------------------------------
219 // Abstract methods
220 // ------------------------------------------------------------------------
221
222 /**
223 * Handle the given event and send the appropriate state transitions into
224 * the the state system.
225 *
226 * This is basically the same thing as IStateChangeInput.processEvent(),
227 * except here processEvent() and eventHandle() are run in two different
228 * threads (and the AbstractStateChangeInput takes care of processEvent()
229 * already).
230 *
231 * @param event
232 * The event to process. If you need a specific event type, you
233 * should check for its instance right at the beginning.
234 */
235 protected abstract void eventHandle(ITmfEvent event);
236
237 }
This page took 0.03673 seconds and 5 git commands to generate.