f55141f01cf2412db93487f5caa155750c825062
[deliverable/tracecompass.git] / statesystem / org.eclipse.tracecompass.statesystem.core / src / org / eclipse / tracecompass / internal / statesystem / core / backend / historytree / ThreadedHistoryTreeBackend.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2016 Ericsson
3 * Copyright (c) 2010, 2011 École Polytechnique de Montréal
4 * Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com>
5 *
6 * All rights reserved. This program and the accompanying materials are
7 * made available under the terms of the Eclipse Public License v1.0 which
8 * accompanies this distribution, and is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 *
11 * Contributors:
12 * Alexandre Montplaisir - Initial API and implementation
13 *******************************************************************************/
14
15 package org.eclipse.tracecompass.internal.statesystem.core.backend.historytree;
16
17 import java.io.File;
18 import java.io.IOException;
19 import java.util.List;
20
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
23 import org.eclipse.tracecompass.internal.statesystem.core.Activator;
24 import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
25 import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException;
26 import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
27 import org.eclipse.tracecompass.statesystem.core.statevalue.ITmfStateValue;
28 import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue;
29
30 /**
31 * Variant of the HistoryTreeBackend which runs all the interval-insertion logic
32 * in a separate thread.
33 *
34 * @author Alexandre Montplaisir
35 */
36 public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend
37 implements Runnable {
38
39 private static final int CHUNK_SIZE = 127;
40 private final @NonNull BufferedBlockingQueue<HTInterval> intervalQueue;
41 private final @NonNull Thread shtThread;
42 /**
43 * The backend tracks its end time separately from the tree, to take into
44 * consideration intervals in the queue.
45 */
46 private long fEndTime;
47
48 /**
49 * New state history constructor
50 *
51 * Note that it usually doesn't make sense to use a Threaded HT if you're
52 * opening an existing state-file, but you know what you're doing...
53 *
54 * @param ssid
55 * The state system's id
56 * @param newStateFile
57 * The name of the history file that will be created. Should end
58 * in ".ht"
59 * @param providerVersion
60 * Version of of the state provider. We will only try to reopen
61 * existing files if this version matches the one in the
62 * framework.
63 * @param startTime
64 * The earliest timestamp stored in the history
65 * @param queueSize
66 * The size of the interval insertion queue. 2000 - 10000 usually
67 * works well
68 * @param blockSize
69 * The size of the blocks in the file
70 * @param maxChildren
71 * The maximum number of children allowed for each core node
72 * @throws IOException
73 * If there was a problem opening the history file for writing
74 */
75 public ThreadedHistoryTreeBackend(@NonNull String ssid,
76 File newStateFile,
77 int providerVersion,
78 long startTime,
79 int queueSize,
80 int blockSize,
81 int maxChildren)
82 throws IOException {
83 super(ssid, newStateFile, providerVersion, startTime, blockSize, maxChildren);
84 fEndTime = startTime;
85
86 intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE);
87 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
88 shtThread.start();
89 }
90
91 /**
92 * New State History constructor. This version provides default values for
93 * blockSize and maxChildren.
94 *
95 * @param ssid
96 * The state system's id
97 * @param newStateFile
98 * The name of the history file that will be created. Should end
99 * in ".ht"
100 * @param providerVersion
101 * Version of of the state provider. We will only try to reopen
102 * existing files if this version matches the one in the
103 * framework.
104 * @param startTime
105 * The earliest timestamp stored in the history
106 * @param queueSize
107 * The size of the interval insertion queue. 2000 - 10000 usually
108 * works well
109 * @throws IOException
110 * If there was a problem opening the history file for writing
111 */
112 public ThreadedHistoryTreeBackend(@NonNull String ssid,
113 File newStateFile,
114 int providerVersion,
115 long startTime,
116 int queueSize)
117 throws IOException {
118 super(ssid, newStateFile, providerVersion, startTime);
119 fEndTime = startTime;
120
121 intervalQueue = new BufferedBlockingQueue<>(queueSize / CHUNK_SIZE, CHUNK_SIZE);
122 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
123 shtThread.start();
124 }
125
126 /*
127 * The Threaded version does not specify an "existing file" constructor,
128 * since the history is already built (and we only use the other thread
129 * during building). Just use a plain HistoryTreeProvider in this case.
130 *
131 * TODO but what about streaming??
132 */
133
134 @Override
135 public void insertPastState(long stateStartTime, long stateEndTime,
136 int quark, ITmfStateValue value) throws TimeRangeException {
137 /*
138 * Here, instead of directly inserting the elements in the History Tree
139 * underneath, we'll put them in the Queue. They will then be taken and
140 * processed by the other thread executing the run() method.
141 */
142 HTInterval interval = new HTInterval(stateStartTime, stateEndTime,
143 quark, (TmfStateValue) value);
144 intervalQueue.put(interval);
145 fEndTime = Math.max(fEndTime, stateEndTime);
146 }
147
148 @Override
149 public long getEndTime() {
150 return fEndTime;
151 }
152
153 @Override
154 public void finishedBuilding(long endTime) {
155 /*
156 * We need to commit everything in the History Tree and stop the
157 * standalone thread before returning to the StateHistorySystem. (SHS
158 * will then write the Attribute Tree to the file, that must not happen
159 * at the same time we are writing the last nodes!)
160 */
161
162 stopRunningThread(endTime);
163 setFinishedBuilding(true);
164 return;
165 }
166
167 @Override
168 public void dispose() {
169 if (!isFinishedBuilding()) {
170 stopRunningThread(Long.MAX_VALUE);
171 }
172 /*
173 * isFinishedBuilding remains false, so the superclass will ask the
174 * back-end to delete the file.
175 */
176 super.dispose();
177 }
178
179 private void stopRunningThread(long endTime) {
180 if (!shtThread.isAlive()) {
181 return;
182 }
183
184 /*
185 * Send a "poison pill" in the queue, then wait for the HT to finish its
186 * closeTree()
187 */
188 try {
189 HTInterval pill = new HTInterval(-1, endTime, -1, TmfStateValue.nullValue());
190 intervalQueue.put(pill);
191 intervalQueue.flushInputBuffer();
192 shtThread.join();
193 } catch (TimeRangeException e) {
194 Activator.getDefault().logError("Error closing state system", e); //$NON-NLS-1$
195 } catch (InterruptedException e) {
196 Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
197 }
198 }
199
200 @Override
201 public void run() {
202 try {
203 HTInterval currentInterval = intervalQueue.blockingPeek();
204 while (currentInterval.getStartTime() != -1) {
205 /* Send the interval to the History Tree */
206 getSHT().insertInterval(currentInterval);
207 /* Actually remove the interval from the queue */
208 // FIXME Replace with remove() once it is implemented.
209 intervalQueue.take();
210 currentInterval = intervalQueue.blockingPeek();
211 }
212 if (currentInterval.getAttribute() != -1) {
213 /* Make sure this is the "poison pill" we are waiting for */
214 throw new IllegalStateException();
215 }
216 /*
217 * We've been told we're done, let's write down everything and quit.
218 * The end time of this "signal interval" is actually correct.
219 */
220 getSHT().closeTree(currentInterval.getEndTime());
221 return;
222 } catch (TimeRangeException e) {
223 /* This should not happen */
224 Activator.getDefault().logError("Error starting the state system", e); //$NON-NLS-1$
225 }
226 }
227
228 // ------------------------------------------------------------------------
229 // Query methods
230 // ------------------------------------------------------------------------
231
232 @Override
233 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
234 throws TimeRangeException, StateSystemDisposedException {
235 super.doQuery(currentStateInfo, t);
236
237 if (isFinishedBuilding()) {
238 /*
239 * The history tree is the only place to look for intervals once
240 * construction is finished.
241 */
242 return;
243 }
244
245 /*
246 * It is possible we may have missed some intervals due to them being in
247 * the queue while the query was ongoing. Go over the results to see if
248 * we missed any.
249 */
250 for (int i = 0; i < currentStateInfo.size(); i++) {
251 if (currentStateInfo.get(i) == null) {
252 /* Query the missing interval via "unicast" */
253 ITmfStateInterval interval = doSingularQuery(t, i);
254 currentStateInfo.set(i, interval);
255 }
256 }
257 }
258
259 @Override
260 public ITmfStateInterval doSingularQuery(long t, int attributeQuark)
261 throws TimeRangeException, StateSystemDisposedException {
262 ITmfStateInterval ret = super.doSingularQuery(t, attributeQuark);
263 if (ret != null) {
264 return ret;
265 }
266
267 /*
268 * We couldn't find the interval in the history tree. It's possible that
269 * it is currently in the intervalQueue. Look for it there. Note that
270 * BufferedBlockingQueue's iterator() is thread-safe (no need to lock
271 * the queue).
272 */
273 for (ITmfStateInterval interval : intervalQueue) {
274 if (interval.getAttribute() == attributeQuark && interval.intersects(t)) {
275 return interval;
276 }
277 }
278
279 /*
280 * If we missed it again, it's because it got inserted in the tree
281 * *while we were iterating* on the queue. One last pass in the tree
282 * should find it.
283 *
284 * This case is really rare, which is why we do a second pass at the end
285 * if needed, instead of systematically checking in the queue first
286 * (which is slow).
287 */
288 return super.doSingularQuery(t, attributeQuark);
289 }
290
291 }
This page took 0.03694 seconds and 4 git commands to generate.