ss: Also search in the ThreadedHistoryBackend's interval queue
[deliverable/tracecompass.git] / org.eclipse.tracecompass.statesystem.core / src / org / eclipse / tracecompass / statesystem / core / backend / historytree / ThreadedHistoryTreeBackend.java
1 /*******************************************************************************
2 * Copyright (c) 2012, 2013 Ericsson
3 * Copyright (c) 2010, 2011 École Polytechnique de Montréal
4 * Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com>
5 *
6 * All rights reserved. This program and the accompanying materials are
7 * made available under the terms of the Eclipse Public License v1.0 which
8 * accompanies this distribution, and is available at
9 * http://www.eclipse.org/legal/epl-v10.html
10 *
11 *******************************************************************************/
12
13 package org.eclipse.tracecompass.statesystem.core.backend.historytree;
14
15 import java.io.File;
16 import java.io.IOException;
17 import java.util.List;
18 import java.util.concurrent.ArrayBlockingQueue;
19 import java.util.concurrent.BlockingQueue;
20
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.tracecompass.internal.statesystem.core.Activator;
23 import org.eclipse.tracecompass.internal.statesystem.core.backend.historytree.HTInterval;
24 import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
25 import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException;
26 import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
27 import org.eclipse.tracecompass.statesystem.core.statevalue.ITmfStateValue;
28 import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue;
29
30 /**
31 * Variant of the HistoryTreeBackend which runs all the interval-insertion logic
32 * in a separate thread.
33 *
34 * @author Alexandre Montplaisir
35 * @since 3.0
36 */
37 public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend
38 implements Runnable {
39
40 private final @NonNull BlockingQueue<HTInterval> intervalQueue;
41 private final @NonNull Thread shtThread;
42
43 /**
44 * New state history constructor
45 *
46 * Note that it usually doesn't make sense to use a Threaded HT if you're
47 * opening an existing state-file, but you know what you're doing...
48 *
49 * @param newStateFile
50 * The name of the history file that will be created. Should end
51 * in ".ht"
52 * @param blockSize
53 * The size of the blocks in the file
54 * @param maxChildren
55 * The maximum number of children allowed for each core node
56 * @param startTime
57 * The earliest timestamp stored in the history
58 * @param providerVersion
59 * Version of of the state provider. We will only try to reopen
60 * existing files if this version matches the one in the
61 * framework.
62 * @param queueSize
63 * The size of the interval insertion queue. 2000 - 10000 usually
64 * works well
65 * @throws IOException
66 * If there was a problem opening the history file for writing
67 */
68 public ThreadedHistoryTreeBackend(File newStateFile, int blockSize,
69 int maxChildren, long startTime, int providerVersion, int queueSize)
70 throws IOException {
71 super(newStateFile, blockSize, maxChildren, providerVersion, startTime);
72
73 intervalQueue = new ArrayBlockingQueue<>(queueSize);
74 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
75 shtThread.start();
76 }
77
78 /**
79 * New State History constructor. This version provides default values for
80 * blockSize and maxChildren.
81 *
82 * @param newStateFile
83 * The name of the history file that will be created. Should end
84 * in ".ht"
85 * @param startTime
86 * The earliest timestamp stored in the history
87 * @param providerVersion
88 * Version of of the state provider. We will only try to reopen
89 * existing files if this version matches the one in the
90 * framework.
91 * @param queueSize
92 * The size of the interval insertion queue. 2000 - 10000 usually
93 * works well
94 * @throws IOException
95 * If there was a problem opening the history file for writing
96 */
97 public ThreadedHistoryTreeBackend(File newStateFile, long startTime,
98 int providerVersion, int queueSize) throws IOException {
99 super(newStateFile, providerVersion, startTime);
100
101 intervalQueue = new ArrayBlockingQueue<>(queueSize);
102 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
103 shtThread.start();
104 }
105
106 /*
107 * The Threaded version does not specify an "existing file" constructor,
108 * since the history is already built (and we only use the other thread
109 * during building). Just use a plain HistoryTreeProvider in this case.
110 *
111 * TODO but what about streaming??
112 */
113
114 @Override
115 public void insertPastState(long stateStartTime, long stateEndTime,
116 int quark, ITmfStateValue value) throws TimeRangeException {
117 /*
118 * Here, instead of directly inserting the elements in the History Tree
119 * underneath, we'll put them in the Queue. They will then be taken and
120 * processed by the other thread executing the run() method.
121 */
122 HTInterval interval = new HTInterval(stateStartTime, stateEndTime,
123 quark, (TmfStateValue) value);
124 try {
125 intervalQueue.put(interval);
126 } catch (InterruptedException e) {
127 Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
128 }
129 }
130
131 @Override
132 public void finishedBuilding(long endTime) {
133 /*
134 * We need to commit everything in the History Tree and stop the
135 * standalone thread before returning to the StateHistorySystem. (SHS
136 * will then write the Attribute Tree to the file, that must not happen
137 * at the same time we are writing the last nodes!)
138 */
139
140 stopRunningThread(endTime);
141 isFinishedBuilding = true;
142 return;
143 }
144
145 @Override
146 public void dispose() {
147 if (!isFinishedBuilding) {
148 stopRunningThread(Long.MAX_VALUE);
149 }
150 /*
151 * isFinishedBuilding remains false, so the superclass will ask the
152 * back-end to delete the file.
153 */
154 super.dispose();
155 }
156
157 private void stopRunningThread(long endTime) {
158 if (!shtThread.isAlive()) {
159 return;
160 }
161
162 /*
163 * Send a "poison pill" in the queue, then wait for the HT to finish
164 * its closeTree()
165 */
166 try {
167 HTInterval pill = new HTInterval(-1, endTime, -1, TmfStateValue.nullValue());
168 intervalQueue.put(pill);
169 shtThread.join();
170 } catch (TimeRangeException e) {
171 Activator.getDefault().logError("Error closing state system", e); //$NON-NLS-1$
172 } catch (InterruptedException e) {
173 Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
174 }
175 }
176
177 @Override
178 public void run() {
179 HTInterval currentInterval;
180 try {
181 currentInterval = intervalQueue.take();
182 while (currentInterval.getStartTime() != -1) {
183 /* Send the interval to the History Tree */
184 getSHT().insertInterval(currentInterval);
185 currentInterval = intervalQueue.take();
186 }
187 if (currentInterval.getAttribute() != -1) {
188 /* Make sure this is the "poison pill" we are waiting for */
189 throw new IllegalStateException();
190 }
191 /*
192 * We've been told we're done, let's write down everything and quit.
193 * The end time of this "signal interval" is actually correct.
194 */
195 getSHT().closeTree(currentInterval.getEndTime());
196 return;
197 } catch (InterruptedException e) {
198 /* We've been interrupted abnormally */
199 Activator.getDefault().logError("State History Tree interrupted!", e); //$NON-NLS-1$
200 } catch (TimeRangeException e) {
201 /* This also should not happen */
202 Activator.getDefault().logError("Error starting the state system", e); //$NON-NLS-1$
203 }
204 }
205
206 // ------------------------------------------------------------------------
207 // Query methods
208 // ------------------------------------------------------------------------
209
210 @Override
211 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
212 throws TimeRangeException, StateSystemDisposedException {
213 super.doQuery(currentStateInfo, t);
214
215 if (isFinishedBuilding) {
216 /*
217 * The history tree is the only place to look for intervals once
218 * construction is finished.
219 */
220 return;
221 }
222
223 /*
224 * It is possible we may have missed some intervals due to them being in
225 * the queue while the query was ongoing. Go over the results to see if
226 * we missed any.
227 */
228 for (int i = 0; i < currentStateInfo.size(); i++) {
229 if (currentStateInfo.get(i) == null) {
230 /* Query the missing interval via "unicast" */
231 ITmfStateInterval interval = doSingularQuery(t, i);
232 currentStateInfo.set(i, interval);
233 }
234 }
235 }
236
237 @Override
238 public ITmfStateInterval doSingularQuery(long t, int attributeQuark)
239 throws TimeRangeException, StateSystemDisposedException {
240 ITmfStateInterval ret = super.doSingularQuery(t, attributeQuark);
241 if (ret != null) {
242 return ret;
243 }
244
245 /*
246 * We couldn't find the interval in the history tree. It's possible that
247 * it is currently in the intervalQueue. Look for it there. Note that
248 * ArrayBlockingQueue's iterator() is thread-safe (no need to lock the
249 * queue).
250 */
251 for (ITmfStateInterval interval : intervalQueue) {
252 if (interval.getAttribute() == attributeQuark && interval.intersects(t)) {
253 return interval;
254 }
255 }
256
257 /*
258 * If we missed it again, it's because it got inserted in the tree
259 * *while we were iterating* on the queue. One last pass in the tree
260 * should find it.
261 *
262 * This case is really rare, which is why we do a second pass at the
263 * end if needed, instead of systematically checking in the queue first
264 * (which is slow).
265 */
266 return super.doSingularQuery(t, attributeQuark);
267 }
268
269 }
This page took 0.036399 seconds and 5 git commands to generate.