Commit | Line | Data |
---|---|---|
79e0a1df AM |
1 | /******************************************************************************* |
2 | * Copyright (c) 2012 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Alexandre Montplaisir - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.core.statesystem; | |
14 | ||
15 | import java.util.concurrent.ArrayBlockingQueue; | |
16 | import java.util.concurrent.BlockingQueue; | |
17 | ||
18 | import org.eclipse.linuxtools.tmf.core.event.ITmfEvent; | |
19 | import org.eclipse.linuxtools.tmf.core.exceptions.TimeRangeException; | |
20 | import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace; | |
21 | ||
22 | ||
23 | /** | |
24 | * Instead of using IStateChangeInput directly, one can extend this class, which | |
25 | * defines a lot of the common functions of the state change input plugin. | |
26 | * | |
27 | * It will handle the state-system-processing in a separate thread, which is | |
28 | * normally not a bad idea for traces of some size. | |
29 | * | |
30 | * processEvent() is replaced with eventHandle(), so that all the multi-thread | |
31 | * logic is abstracted away. | |
32 | * | |
33 | * @author Alexandre Montplaisir | |
34 | * @since 2.0 | |
35 | */ | |
36 | public abstract class AbstractStateChangeInput implements IStateChangeInput { | |
37 | ||
38 | private static final int DEFAULT_EVENTS_QUEUE_SIZE = 10000; | |
39 | ||
79e0a1df | 40 | private final ITmfTrace trace; |
79044a66 AM |
41 | private final Class<? extends ITmfEvent> eventType; |
42 | private final BlockingQueue<ITmfEvent> eventsQueue; | |
79e0a1df AM |
43 | private final Thread eventHandlerThread; |
44 | ||
45 | private boolean ssAssigned; | |
f1f86dfb | 46 | protected ITmfStateSystemBuilder ss; |
79e0a1df AM |
47 | private ITmfEvent currentEvent; |
48 | ||
49 | /** | |
50 | * Instantiate a new state provider plugin. | |
51 | * | |
52 | * @param trace | |
53 | * The LTTng 2.0 kernel trace directory | |
79044a66 AM |
54 | * @param eventType |
55 | * The specific class for the event type that will be used within | |
56 | * the subclass | |
71f2da63 AM |
57 | * @param id |
58 | * Name given to this state change input. Only used internally. | |
79e0a1df | 59 | */ |
71f2da63 AM |
60 | public AbstractStateChangeInput(ITmfTrace trace, |
61 | Class<? extends ITmfEvent> eventType, String id) { | |
79e0a1df | 62 | this.trace = trace; |
79044a66 AM |
63 | this.eventType = eventType; |
64 | eventsQueue = new ArrayBlockingQueue<ITmfEvent>(DEFAULT_EVENTS_QUEUE_SIZE); | |
79044a66 | 65 | ssAssigned = false; |
71f2da63 AM |
66 | |
67 | String id2 = (id == null ? "Unamed" : id); //$NON-NLS-1$ | |
68 | eventHandlerThread = new Thread(new EventProcessor(), id2 + " Event Handler"); //$NON-NLS-1$ | |
69 | ||
79e0a1df AM |
70 | } |
71 | ||
72 | @Override | |
73 | public ITmfTrace getTrace() { | |
74 | return trace; | |
75 | } | |
76 | ||
77 | @Override | |
78 | public long getStartTime() { | |
79 | return trace.getStartTime().getValue(); | |
80 | } | |
81 | ||
82 | @Override | |
f1f86dfb | 83 | public void assignTargetStateSystem(ITmfStateSystemBuilder ssb) { |
79e0a1df AM |
84 | ss = ssb; |
85 | ssAssigned = true; | |
86 | eventHandlerThread.start(); | |
87 | } | |
88 | ||
89 | @Override | |
90 | public void dispose() { | |
91 | /* Insert a null event in the queue to stop the event handler's thread. */ | |
92 | try { | |
93 | eventsQueue.put(org.eclipse.linuxtools.tmf.core.ctfadaptor.CtfTmfEvent.getNullEvent()); | |
94 | eventHandlerThread.join(); | |
95 | } catch (InterruptedException e) { | |
96 | e.printStackTrace(); | |
97 | } | |
98 | ssAssigned = false; | |
99 | ss = null; | |
100 | } | |
101 | ||
102 | @Override | |
79044a66 AM |
103 | public final Class<? extends ITmfEvent> getExpectedEventType() { |
104 | return eventType; | |
105 | } | |
106 | ||
107 | @Override | |
108 | public final void processEvent(ITmfEvent event) { | |
79e0a1df AM |
109 | /* Make sure the target state system has been assigned */ |
110 | if (!ssAssigned) { | |
111 | System.err.println("Cannot process event without a target state system"); //$NON-NLS-1$ | |
112 | return; | |
113 | } | |
114 | ||
115 | /* Insert the event we're received into the events queue */ | |
116 | ITmfEvent curEvent = event; | |
117 | try { | |
118 | eventsQueue.put(curEvent); | |
119 | } catch (InterruptedException e) { | |
120 | e.printStackTrace(); | |
121 | } | |
122 | } | |
123 | ||
79e0a1df AM |
124 | /** |
125 | * This is the runner class for the second thread, which will take the | |
126 | * events from the queue and pass them through the state system. | |
127 | */ | |
128 | private class EventProcessor implements Runnable { | |
129 | ||
130 | @Override | |
131 | public void run() { | |
132 | if (ss == null) { | |
133 | System.err.println("Cannot run event manager without assigning a target state system first!"); //$NON-NLS-1$ | |
134 | return; | |
135 | } | |
136 | ITmfEvent event; | |
137 | ||
138 | try { | |
139 | event = eventsQueue.take(); | |
140 | while (event.getTimestamp().getValue() != -1) { | |
141 | currentEvent = event; | |
79044a66 AM |
142 | |
143 | /* Make sure this is an event the sub-class can process */ | |
144 | if (eventType.isInstance(event)) { | |
145 | eventHandle(event); | |
146 | } | |
79e0a1df AM |
147 | event = eventsQueue.take(); |
148 | } | |
149 | /* We've received the last event, clean up */ | |
150 | closeStateSystem(); | |
151 | return; | |
152 | } catch (InterruptedException e) { | |
153 | /* We've been interrupted abnormally */ | |
154 | System.out.println("Event handler interrupted!"); //$NON-NLS-1$ | |
155 | e.printStackTrace(); | |
156 | } | |
157 | } | |
158 | ||
159 | private void closeStateSystem() { | |
160 | /* Close the History system, if there is one */ | |
161 | if (currentEvent == null) { | |
162 | return; | |
163 | } | |
164 | try { | |
165 | ss.closeHistory(currentEvent.getTimestamp().getValue()); | |
166 | } catch (TimeRangeException e) { | |
167 | /* | |
168 | * Since we're using currentEvent.getTimestamp, this shouldn't | |
169 | * cause any problem | |
170 | */ | |
171 | e.printStackTrace(); | |
172 | } | |
173 | } | |
174 | } | |
175 | ||
176 | // ------------------------------------------------------------------------ | |
177 | // Abstract methods | |
178 | // ------------------------------------------------------------------------ | |
179 | ||
79e0a1df AM |
180 | /** |
181 | * Handle the given event and send the appropriate state transitions into | |
182 | * the the state system. | |
183 | * | |
184 | * This is basically the same thing as IStateChangeInput.processEvent(), | |
185 | * except here processEvent() and eventHandle() are run in two different | |
186 | * threads (and the AbstractStateChangeInput takes care of processEvent() | |
187 | * already). | |
188 | * | |
189 | * @param event | |
190 | * The event to process. If you need a specific event type, you | |
191 | * should check for its instance right at the beginning. | |
192 | */ | |
193 | protected abstract void eventHandle(ITmfEvent event); | |
194 | } |