From 2ce551cccdac4096d3efd910f11e3fabe3c51856 Mon Sep 17 00:00:00 2001 From: Matthew Khouzam Date: Fri, 10 Apr 2015 13:15:46 -0400 Subject: [PATCH] ss: accelerate state system creation by 25% by coalescing intervals The state system threaded back end sends one interval at a time to a blocking queue to be written to disk. This patch makes it send several intervals at a time, meaning we have several times less context switches and several times less blocked threads. Change-Id: I5cf26da2f1593749245bb6cf5c6f3c5ed65f00e0 Signed-off-by: Matthew Khouzam Reviewed-on: https://git.eclipse.org/r/45603 Reviewed-by: Hudson CI Tested-by: Alexandre Montplaisir --- .../ThreadedHistoryTreeBackend.java | 71 +++++++++++-------- .../TmfStateSystemAnalysisModule.java | 2 +- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/org.eclipse.tracecompass.statesystem.core/src/org/eclipse/tracecompass/internal/statesystem/core/backend/historytree/ThreadedHistoryTreeBackend.java b/org.eclipse.tracecompass.statesystem.core/src/org/eclipse/tracecompass/internal/statesystem/core/backend/historytree/ThreadedHistoryTreeBackend.java index ee7e19a4c9..e5f9429a9c 100644 --- a/org.eclipse.tracecompass.statesystem.core/src/org/eclipse/tracecompass/internal/statesystem/core/backend/historytree/ThreadedHistoryTreeBackend.java +++ b/org.eclipse.tracecompass.statesystem.core/src/org/eclipse/tracecompass/internal/statesystem/core/backend/historytree/ThreadedHistoryTreeBackend.java @@ -16,6 +16,9 @@ package org.eclipse.tracecompass.internal.statesystem.core.backend.historytree; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -37,9 +40,12 @@ import org.eclipse.tracecompass.statesystem.core.statevalue.TmfStateValue; public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend implements Runnable { - private final @NonNull BlockingQueue intervalQueue; + private static final int INTERVAL_CHUNK_SIZE = 512; + private final @NonNull BlockingQueue> intervalQueue; private final @NonNull Thread shtThread; + private Collection fCurrentChunk = new ArrayList<>(INTERVAL_CHUNK_SIZE); + /** * New state history constructor * @@ -74,7 +80,7 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend int queueSize, int blockSize, int maxChildren) - throws IOException { + throws IOException { super(ssid, newStateFile, providerVersion, startTime, blockSize, maxChildren); intervalQueue = new ArrayBlockingQueue<>(queueSize); @@ -108,7 +114,7 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend int providerVersion, long startTime, int queueSize) - throws IOException { + throws IOException { super(ssid, newStateFile, providerVersion, startTime); intervalQueue = new ArrayBlockingQueue<>(queueSize); @@ -132,12 +138,15 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend * underneath, we'll put them in the Queue. They will then be taken and * processed by the other thread executing the run() method. */ - HTInterval interval = new HTInterval(stateStartTime, stateEndTime, - quark, (TmfStateValue) value); - try { - intervalQueue.put(interval); - } catch (InterruptedException e) { - Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$ + fCurrentChunk.add(new HTInterval(stateStartTime, stateEndTime, + quark, (TmfStateValue) value)); + if (fCurrentChunk.size() >= INTERVAL_CHUNK_SIZE) { + try { + intervalQueue.put(fCurrentChunk); + fCurrentChunk = new ArrayList<>(INTERVAL_CHUNK_SIZE); + } catch (InterruptedException e) { + Activator.getDefault().logError("State system interrupted", e); //$NON-NLS-1$ + } } } @@ -177,7 +186,10 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend * closeTree() */ try { - HTInterval pill = new HTInterval(-1, endTime, -1, TmfStateValue.nullValue()); + Collection pill = Collections.singletonList(new HTInterval(-1, endTime, -1, TmfStateValue.nullValue())); + if (!fCurrentChunk.isEmpty()) { + intervalQueue.put(fCurrentChunk); + } intervalQueue.put(pill); shtThread.join(); } catch (TimeRangeException e) { @@ -189,24 +201,23 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend @Override public void run() { - HTInterval currentInterval; try { - currentInterval = intervalQueue.take(); - while (currentInterval.getStartTime() != -1) { - /* Send the interval to the History Tree */ - getSHT().insertInterval(currentInterval); - currentInterval = intervalQueue.take(); - } - if (currentInterval.getAttribute() != -1) { - /* Make sure this is the "poison pill" we are waiting for */ - throw new IllegalStateException(); + consumerloop: while (true) { + Iterable currentIntervals = intervalQueue.take(); + for (HTInterval currentInterval : currentIntervals) { + if (currentInterval.getAttribute() == -1) { + /* + * We've been told we're done, let's write down + * everything and quit. The end time of this + * "signal interval" is actually correct. + */ + getSHT().closeTree(currentInterval.getEndTime()); + break consumerloop; + } + /* Send the interval to the History Tree */ + getSHT().insertInterval(currentInterval); + } } - /* - * We've been told we're done, let's write down everything and quit. - * The end time of this "signal interval" is actually correct. - */ - getSHT().closeTree(currentInterval.getEndTime()); - return; } catch (InterruptedException e) { /* We've been interrupted abnormally */ Activator.getDefault().logError("State History Tree interrupted!", e); //$NON-NLS-1$ @@ -261,9 +272,11 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend * ArrayBlockingQueue's iterator() is thread-safe (no need to lock the * queue). */ - for (ITmfStateInterval interval : intervalQueue) { - if (interval.getAttribute() == attributeQuark && interval.intersects(t)) { - return interval; + for (Iterable intervals : intervalQueue) { + for (HTInterval interval : intervals) { + if (interval.getAttribute() == attributeQuark && interval.intersects(t)) { + return interval; + } } } diff --git a/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/TmfStateSystemAnalysisModule.java b/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/TmfStateSystemAnalysisModule.java index 80d918d7d0..50230e6095 100644 --- a/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/TmfStateSystemAnalysisModule.java +++ b/org.eclipse.tracecompass.tmf.core/src/org/eclipse/tracecompass/tmf/core/statesystem/TmfStateSystemAnalysisModule.java @@ -269,7 +269,7 @@ public abstract class TmfStateSystemAnalysisModule extends TmfAbstractAnalysisMo } /* Size of the blocking queue to use when building a state history */ - final int QUEUE_SIZE = 10000; + final int QUEUE_SIZE = 256; try { IStateHistoryBackend backend = StateHistoryBackendFactory.createHistoryTreeBackendNewFile( -- 2.34.1