ss: Also search in the ThreadedHistoryBackend's interval queue
[deliverable/tracecompass.git] / org.eclipse.tracecompass.statesystem.core / src / org / eclipse / tracecompass / statesystem / core / backend / historytree / ThreadedHistoryTreeBackend.java
CommitLineData
a52fde77 1/*******************************************************************************
61759503 2 * Copyright (c) 2012, 2013 Ericsson
a52fde77
AM
3 * Copyright (c) 2010, 2011 École Polytechnique de Montréal
4 * Copyright (c) 2010, 2011 Alexandre Montplaisir <alexandre.montplaisir@gmail.com>
1a4205d9 5 *
a52fde77
AM
6 * All rights reserved. This program and the accompanying materials are
7 * made available under the terms of the Eclipse Public License v1.0 which
8 * accompanies this distribution, and is available at
9 * http://www.eclipse.org/legal/epl-v10.html
1a4205d9 10 *
a52fde77
AM
11 *******************************************************************************/
12
e894a508 13package org.eclipse.tracecompass.statesystem.core.backend.historytree;
a52fde77
AM
14
15import java.io.File;
16import java.io.IOException;
e62a23a9 17import java.util.List;
a52fde77
AM
18import java.util.concurrent.ArrayBlockingQueue;
19import java.util.concurrent.BlockingQueue;
20
e62a23a9 21import org.eclipse.jdt.annotation.NonNull;
e894a508
AM
22import org.eclipse.tracecompass.internal.statesystem.core.Activator;
23import org.eclipse.tracecompass.internal.statesystem.core.backend.historytree.HTInterval;
e62a23a9 24import org.eclipse.tracecompass.statesystem.core.exceptions.StateSystemDisposedException;
e894a508 25import org.eclipse.tracecompass.statesystem.core.exceptions.TimeRangeException;
e62a23a9 26import org.eclipse.tracecompass.statesystem.core.interval.ITmfStateInterval;
e894a508
AM
27import org.eclipse.tracecompass.statesystem.core.statevalue.ITmfStateValue;
28import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue;
a52fde77
AM
29
30/**
31 * Variant of the HistoryTreeBackend which runs all the interval-insertion logic
32 * in a separate thread.
1a4205d9 33 *
bcec0116
AM
34 * @author Alexandre Montplaisir
35 * @since 3.0
a52fde77 36 */
ab604305
AM
37public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend
38 implements Runnable {
a52fde77 39
e62a23a9
AM
40 private final @NonNull BlockingQueue<HTInterval> intervalQueue;
41 private final @NonNull Thread shtThread;
a52fde77
AM
42
43 /**
44 * New state history constructor
1a4205d9 45 *
a52fde77
AM
46 * Note that it usually doesn't make sense to use a Threaded HT if you're
47 * opening an existing state-file, but you know what you're doing...
1a4205d9 48 *
a52fde77
AM
49 * @param newStateFile
50 * The name of the history file that will be created. Should end
51 * in ".ht"
52 * @param blockSize
53 * The size of the blocks in the file
54 * @param maxChildren
55 * The maximum number of children allowed for each core node
56 * @param startTime
57 * The earliest timestamp stored in the history
a96cc6be
AM
58 * @param providerVersion
59 * Version of of the state provider. We will only try to reopen
60 * existing files if this version matches the one in the
61 * framework.
a52fde77
AM
62 * @param queueSize
63 * The size of the interval insertion queue. 2000 - 10000 usually
64 * works well
65 * @throws IOException
66 * If there was a problem opening the history file for writing
67 */
68 public ThreadedHistoryTreeBackend(File newStateFile, int blockSize,
a96cc6be
AM
69 int maxChildren, long startTime, int providerVersion, int queueSize)
70 throws IOException {
71 super(newStateFile, blockSize, maxChildren, providerVersion, startTime);
a52fde77 72
a4524c1b 73 intervalQueue = new ArrayBlockingQueue<>(queueSize);
a52fde77
AM
74 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
75 shtThread.start();
76 }
77
78 /**
79 * New State History constructor. This version provides default values for
80 * blockSize and maxChildren.
1a4205d9 81 *
a52fde77
AM
82 * @param newStateFile
83 * The name of the history file that will be created. Should end
84 * in ".ht"
85 * @param startTime
86 * The earliest timestamp stored in the history
a96cc6be
AM
87 * @param providerVersion
88 * Version of of the state provider. We will only try to reopen
89 * existing files if this version matches the one in the
90 * framework.
a52fde77
AM
91 * @param queueSize
92 * The size of the interval insertion queue. 2000 - 10000 usually
93 * works well
94 * @throws IOException
95 * If there was a problem opening the history file for writing
96 */
97 public ThreadedHistoryTreeBackend(File newStateFile, long startTime,
a96cc6be
AM
98 int providerVersion, int queueSize) throws IOException {
99 super(newStateFile, providerVersion, startTime);
a52fde77 100
a4524c1b 101 intervalQueue = new ArrayBlockingQueue<>(queueSize);
a52fde77
AM
102 shtThread = new Thread(this, "History Tree Thread"); //$NON-NLS-1$
103 shtThread.start();
104 }
105
106 /*
107 * The Threaded version does not specify an "existing file" constructor,
108 * since the history is already built (and we only use the other thread
109 * during building). Just use a plain HistoryTreeProvider in this case.
1a4205d9 110 *
a52fde77
AM
111 * TODO but what about streaming??
112 */
113
114 @Override
115 public void insertPastState(long stateStartTime, long stateEndTime,
116 int quark, ITmfStateValue value) throws TimeRangeException {
117 /*
118 * Here, instead of directly inserting the elements in the History Tree
119 * underneath, we'll put them in the Queue. They will then be taken and
120 * processed by the other thread executing the run() method.
121 */
122 HTInterval interval = new HTInterval(stateStartTime, stateEndTime,
123 quark, (TmfStateValue) value);
124 try {
125 intervalQueue.put(interval);
126 } catch (InterruptedException e) {
bcec0116 127 Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
a52fde77
AM
128 }
129 }
130
131 @Override
b33c7369 132 public void finishedBuilding(long endTime) {
a52fde77
AM
133 /*
134 * We need to commit everything in the History Tree and stop the
135 * standalone thread before returning to the StateHistorySystem. (SHS
136 * will then write the Attribute Tree to the file, that must not happen
137 * at the same time we are writing the last nodes!)
138 */
139
1a4205d9
AM
140 stopRunningThread(endTime);
141 isFinishedBuilding = true;
142 return;
143 }
144
145 @Override
146 public void dispose() {
147 if (!isFinishedBuilding) {
bcec0116 148 stopRunningThread(Long.MAX_VALUE);
1a4205d9 149 }
a52fde77 150 /*
1a4205d9
AM
151 * isFinishedBuilding remains false, so the superclass will ask the
152 * back-end to delete the file.
153 */
154 super.dispose();
155 }
156
157 private void stopRunningThread(long endTime) {
158 if (!shtThread.isAlive()) {
159 return;
160 }
161
162 /*
163 * Send a "poison pill" in the queue, then wait for the HT to finish
a52fde77
AM
164 * its closeTree()
165 */
166 try {
1a4205d9 167 HTInterval pill = new HTInterval(-1, endTime, -1, TmfStateValue.nullValue());
b33c7369 168 intervalQueue.put(pill);
a52fde77 169 shtThread.join();
b33c7369 170 } catch (TimeRangeException e) {
bcec0116 171 Activator.getDefault().logError("Error closing state system", e); //$NON-NLS-1$
a52fde77 172 } catch (InterruptedException e) {
bcec0116 173 Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$
a52fde77 174 }
a52fde77
AM
175 }
176
177 @Override
178 public void run() {
a52fde77
AM
179 HTInterval currentInterval;
180 try {
181 currentInterval = intervalQueue.take();
182 while (currentInterval.getStartTime() != -1) {
183 /* Send the interval to the History Tree */
e62a23a9 184 getSHT().insertInterval(currentInterval);
a52fde77
AM
185 currentInterval = intervalQueue.take();
186 }
e62a23a9
AM
187 if (currentInterval.getAttribute() != -1) {
188 /* Make sure this is the "poison pill" we are waiting for */
189 throw new IllegalStateException();
190 }
a52fde77 191 /*
6a1074ce
AM
192 * We've been told we're done, let's write down everything and quit.
193 * The end time of this "signal interval" is actually correct.
a52fde77 194 */
e62a23a9 195 getSHT().closeTree(currentInterval.getEndTime());
a52fde77
AM
196 return;
197 } catch (InterruptedException e) {
198 /* We've been interrupted abnormally */
bcec0116 199 Activator.getDefault().logError("State History Tree interrupted!", e); //$NON-NLS-1$
a52fde77
AM
200 } catch (TimeRangeException e) {
201 /* This also should not happen */
bcec0116 202 Activator.getDefault().logError("Error starting the state system", e); //$NON-NLS-1$
a52fde77
AM
203 }
204 }
205
e62a23a9
AM
206 // ------------------------------------------------------------------------
207 // Query methods
208 // ------------------------------------------------------------------------
209
210 @Override
211 public void doQuery(List<ITmfStateInterval> currentStateInfo, long t)
212 throws TimeRangeException, StateSystemDisposedException {
213 super.doQuery(currentStateInfo, t);
214
215 if (isFinishedBuilding) {
216 /*
217 * The history tree is the only place to look for intervals once
218 * construction is finished.
219 */
220 return;
221 }
222
223 /*
224 * It is possible we may have missed some intervals due to them being in
225 * the queue while the query was ongoing. Go over the results to see if
226 * we missed any.
227 */
228 for (int i = 0; i < currentStateInfo.size(); i++) {
229 if (currentStateInfo.get(i) == null) {
230 /* Query the missing interval via "unicast" */
231 ITmfStateInterval interval = doSingularQuery(t, i);
232 currentStateInfo.set(i, interval);
233 }
234 }
235 }
236
237 @Override
238 public ITmfStateInterval doSingularQuery(long t, int attributeQuark)
239 throws TimeRangeException, StateSystemDisposedException {
240 ITmfStateInterval ret = super.doSingularQuery(t, attributeQuark);
241 if (ret != null) {
242 return ret;
243 }
244
245 /*
246 * We couldn't find the interval in the history tree. It's possible that
247 * it is currently in the intervalQueue. Look for it there. Note that
248 * ArrayBlockingQueue's iterator() is thread-safe (no need to lock the
249 * queue).
250 */
251 for (ITmfStateInterval interval : intervalQueue) {
252 if (interval.getAttribute() == attributeQuark && interval.intersects(t)) {
253 return interval;
254 }
255 }
256
257 /*
258 * If we missed it again, it's because it got inserted in the tree
259 * *while we were iterating* on the queue. One last pass in the tree
260 * should find it.
261 *
262 * This case is really rare, which is why we do a second pass at the
263 * end if needed, instead of systematically checking in the queue first
264 * (which is slow).
265 */
266 return super.doSingularQuery(t, attributeQuark);
267 }
268
a52fde77 269}
This page took 0.063698 seconds and 5 git commands to generate.