8a5fd6903f071d833bc20102c0fddb39dfcf0e76
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / statesystem / AbstractTmfStateProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2014 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Alexandre Montplaisir - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.statesystem;
14
15 import java.util.concurrent.ArrayBlockingQueue;
16 import java.util.concurrent.BlockingQueue;
17
18 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
19 import org.eclipse.linuxtools.tmf.core.event.TmfEvent;
20 import org.eclipse.linuxtools.tmf.core.timestamp.ITmfTimestamp;
21 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
22
23
24 /**
25 * Instead of using IStateChangeInput directly, one can extend this class, which
26 * defines a lot of the common functions of the state change input plugin.
27 *
28 * It will handle the state-system-processing in a separate thread, which is
29 * normally not a bad idea for traces of some size.
30 *
31 * processEvent() is replaced with eventHandle(), so that all the multi-thread
32 * logic is abstracted away.
33 *
34 * @author Alexandre Montplaisir
35 * @since 2.0
36 */
37 public abstract class AbstractTmfStateProvider implements ITmfStateProvider {
38
39 private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000;
40
41 private final ITmfTrace trace;
42 private final Class<? extends ITmfEvent> eventType;
43 private final BlockingQueue<ITmfEvent> eventsQueue;
44 private final Thread eventHandlerThread;
45
46 private boolean ssAssigned;
47
48 /** State system in which to insert the state changes */
49 protected ITmfStateSystemBuilder ss = null;
50
51 /**
52 * Instantiate a new state provider plugin.
53 *
54 * @param trace
55 * The LTTng 2.0 kernel trace directory
56 * @param eventType
57 * The specific class for the event type that will be used within
58 * the subclass
59 * @param id
60 * Name given to this state change input. Only used internally.
61 */
62 public AbstractTmfStateProvider(ITmfTrace trace,
63 Class<? extends ITmfEvent> eventType, String id) {
64 this.trace = trace;
65 this.eventType = eventType;
66 eventsQueue = new ArrayBlockingQueue<>(DEFAULT_EVENTS_QUEUE_SIZE);
67 ssAssigned = false;
68
69 String id2 = (id == null ? "Unamed" : id); //$NON-NLS-1$
70 eventHandlerThread = new Thread(new EventProcessor(), id2 + " Event Handler"); //$NON-NLS-1$
71 }
72
73 @Override
74 public ITmfTrace getTrace() {
75 return trace;
76 }
77
78 @Override
79 public long getStartTime() {
80 return trace.getStartTime().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue();
81 }
82
83 @Override
84 public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) {
85 ss = ssb;
86 ssAssigned = true;
87 eventHandlerThread.start();
88 }
89
90 @Override
91 public ITmfStateSystem getAssignedStateSystem() {
92 return ss;
93 }
94
95 @Override
96 public void dispose() {
97 /* Insert a null event in the queue to stop the event handler's thread. */
98 try {
99 eventsQueue.put(END_EVENT);
100 eventHandlerThread.join();
101 } catch (InterruptedException e) {
102 e.printStackTrace();
103 }
104 ssAssigned = false;
105 ss = null;
106 }
107
108 @Override
109 public final Class<? extends ITmfEvent> getExpectedEventType() {
110 return eventType;
111 }
112
113 @Override
114 public final void processEvent(ITmfEvent event) {
115 /* Make sure the target state system has been assigned */
116 if (!ssAssigned) {
117 System.err.println("Cannot process event without a target state system"); //$NON-NLS-1$
118 return;
119 }
120
121 /* Insert the event we're received into the events queue */
122 ITmfEvent curEvent = event;
123 try {
124 eventsQueue.put(curEvent);
125 } catch (InterruptedException e) {
126 e.printStackTrace();
127 }
128 }
129
130 /**
131 * Block the caller until the events queue is empty.
132 */
133 public void waitForEmptyQueue() {
134 /*
135 * We will first insert a dummy event that is guaranteed to not modify
136 * the state. That way, when that event leaves the queue, we will know
137 * for sure that the state system processed the preceding real event.
138 */
139 try {
140 eventsQueue.put(EMPTY_QUEUE_EVENT);
141 while (!eventsQueue.isEmpty()) {
142 Thread.sleep(100);
143 }
144 } catch (InterruptedException e) {
145 e.printStackTrace();
146 }
147 }
148
149 // ------------------------------------------------------------------------
150 // Special event types
151 // ------------------------------------------------------------------------
152
153 /** Fake event indicating the build is over, and the provider should close */
154 private static class EndEvent extends TmfEvent {}
155 /** Fake event indicating we want to clear the current queue */
156 private static class EmptyQueueEvent extends TmfEvent {}
157
158 private static final EndEvent END_EVENT = new EndEvent();
159 private static final EmptyQueueEvent EMPTY_QUEUE_EVENT = new EmptyQueueEvent();
160
161 // ------------------------------------------------------------------------
162 // Inner classes
163 // ------------------------------------------------------------------------
164
165 /**
166 * This is the runner class for the second thread, which will take the
167 * events from the queue and pass them through the state system.
168 */
169 private class EventProcessor implements Runnable {
170
171 private ITmfEvent currentEvent;
172
173 @Override
174 public void run() {
175 if (ss == null) {
176 System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$
177 return;
178 }
179 ITmfEvent event;
180
181 try {
182 event = eventsQueue.take();
183 /* This is a singleton, we want to do != instead of !x.equals */
184 while (event != END_EVENT) {
185 if (event == EMPTY_QUEUE_EVENT) {
186 /* Synchronization event, should be ignored */
187 event = eventsQueue.take();
188 continue;
189 }
190
191 currentEvent = event;
192
193 /* Make sure this is an event the sub-class can process */
194 if (eventType.isInstance(event) && event.getType() != null) {
195 eventHandle(event);
196 }
197 event = eventsQueue.take();
198 }
199 /* We've received the last event, clean up */
200 closeStateSystem();
201 } catch (InterruptedException e) {
202 /* We've been interrupted abnormally */
203 System.out.println("Event handler interrupted!"); //$NON-NLS-1$
204 e.printStackTrace();
205 }
206 }
207
208 private void closeStateSystem() {
209 final long endTime = (currentEvent == null) ? 0 :
210 currentEvent.getTimestamp().normalize(0, ITmfTimestamp.NANOSECOND_SCALE).getValue();
211 ss.closeHistory(endTime);
212 }
213 }
214
215 // ------------------------------------------------------------------------
216 // Abstract methods
217 // ------------------------------------------------------------------------
218
219 /**
220 * Handle the given event and send the appropriate state transitions into
221 * the the state system.
222 *
223 * This is basically the same thing as IStateChangeInput.processEvent(),
224 * except here processEvent() and eventHandle() are run in two different
225 * threads (and the AbstractStateChangeInput takes care of processEvent()
226 * already).
227 *
228 * @param event
229 * The event to process. If you need a specific event type, you
230 * should check for its instance right at the beginning.
231 */
232 protected abstract void eventHandle(ITmfEvent event);
233 }
This page took 0.036263 seconds and 4 git commands to generate.