tmf: Fix possible concurrency issue with event request IDs
[deliverable/tracecompass.git] / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / component / TmfEventProvider.java
CommitLineData
8c8bf09f 1/*******************************************************************************
8967c8c0 2 * Copyright (c) 2009, 2014 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f 9 * Contributors:
fd3f1eff
AM
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
8967c8c0 13 * Bernd Hufmann - Add timer based coalescing for background requests
8c8bf09f
ASL
14 *******************************************************************************/
15
2bdf0193 16package org.eclipse.tracecompass.tmf.core.component;
8c8bf09f 17
8967c8c0 18import java.util.Iterator;
f45257df 19import java.util.LinkedList;
fd3f1eff 20import java.util.List;
8967c8c0
BH
21import java.util.Timer;
22import java.util.TimerTask;
fd3f1eff 23
2bdf0193
AM
24import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
25import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
26import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
27import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
28import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
29import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
30import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
31import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
32import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
33import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
34import org.eclipse.tracecompass.tmf.core.signal.TmfStartSynchSignal;
35import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
36import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
37
38/**
fd3f1eff
AM
39 * An abstract base class that implements ITmfEventProvider.
40 * <p>
41 * This abstract class implements the housekeeping methods to register/
42 * de-register the event provider and to handle generically the event requests.
43 * </p>
0283f7ff 44 *
8fd82db5 45 * @author Francois Chouinard
c4767854 46 * @since 3.0
8c8bf09f 47 */
fd3f1eff
AM
48public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
49
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
53
c4767854
AM
54 /** Default amount of events per request "chunk"
55 * @since 3.0 */
fd3f1eff
AM
56 public static final int DEFAULT_BLOCK_SIZE = 50000;
57
8967c8c0
BH
58 /** Delay for coalescing background requests (in milli-seconds) */
59 private static final long DELAY = 1000;
60
fd3f1eff
AM
61 // ------------------------------------------------------------------------
62 // Attributes
63 // ------------------------------------------------------------------------
64
f45257df
AM
65 /** List of coalesced requests */
66 private final List<TmfCoalescedEventRequest> fPendingCoalescedRequests = new LinkedList<>();
fd3f1eff 67
f45257df
AM
68 /** The type of event handled by this provider */
69 private Class<? extends ITmfEvent> fType;
fd3f1eff
AM
70
71 private final TmfRequestExecutor fExecutor;
72
73 private final Object fLock = new Object();
74
75 private int fSignalDepth = 0;
76
77 private int fRequestPendingCounter = 0;
8c8bf09f 78
506219d0 79 private Timer fTimer;
8967c8c0
BH
80
81 private boolean fIsTimeout = false;
82
00641a97
FC
83 // ------------------------------------------------------------------------
84 // Constructors
85 // ------------------------------------------------------------------------
86
063f0d27
AM
87 /**
88 * Default constructor
89 */
12c155f5 90 public TmfEventProvider() {
00641a97 91 super();
fd3f1eff 92 fExecutor = new TmfRequestExecutor();
12c155f5 93 }
8c8bf09f 94
063f0d27 95 /**
f45257df 96 * Standard constructor. Instantiate and initialize at the same time.
063f0d27
AM
97 *
98 * @param name
fd3f1eff 99 * Name of the provider
063f0d27 100 * @param type
fd3f1eff 101 * The type of events that will be handled
063f0d27 102 */
f45257df 103 public TmfEventProvider(String name, Class<? extends ITmfEvent> type) {
fd3f1eff 104 this();
fd3f1eff 105 init(name, type);
12c155f5
FC
106 }
107
063f0d27 108 /**
f45257df 109 * Initialize this data provider
fd3f1eff
AM
110 *
111 * @param name
112 * Name of the provider
113 * @param type
114 * The type of events that will be handled
115 */
f45257df
AM
116 public void init(String name, Class<? extends ITmfEvent> type) {
117 super.init(name);
118 fType = type;
119 fExecutor.init();
120
121 fSignalDepth = 0;
506219d0
BH
122
123 synchronized (fLock) {
124 fTimer = new Timer();
125 }
126
f45257df 127 TmfProviderManager.register(fType, this);
fd3f1eff
AM
128 }
129
130 @Override
131 public void dispose() {
132 TmfProviderManager.deregister(fType, this);
133 fExecutor.stop();
506219d0
BH
134 synchronized (fLock) {
135 if (fTimer != null) {
136 fTimer.cancel();
137 }
138 fTimer = null;
139 }
fd3f1eff 140 super.dispose();
12c155f5
FC
141 }
142
00641a97 143 // ------------------------------------------------------------------------
fd3f1eff
AM
144 // Accessors
145 // ------------------------------------------------------------------------
146
fd3f1eff
AM
147 /**
148 * Get the event type this provider handles
149 *
150 * @return The type of ITmfEvent
151 */
0f89d4ba 152 public Class<? extends ITmfEvent> getType() {
fd3f1eff
AM
153 return fType;
154 }
155
156 // ------------------------------------------------------------------------
157 // ITmfRequestHandler
00641a97
FC
158 // ------------------------------------------------------------------------
159
c4767854
AM
160 /**
161 * @since 3.0
162 */
5419a136 163 @Override
fd3f1eff
AM
164 public void sendRequest(final ITmfEventRequest request) {
165 synchronized (fLock) {
5a597798
GB
166
167 if (TmfCoreTracer.isRequestTraced()) {
168 TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
169 }
170
8967c8c0
BH
171 if (request.getExecType() == ExecutionType.FOREGROUND) {
172 if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
173 coalesceEventRequest(request);
174 } else {
f45257df 175 queueRequest(request);
8967c8c0
BH
176 }
177 return;
178 }
179
506219d0
BH
180 /*
181 * Dispatch request in case timer is not running.
182 */
183 if (fTimer == null) {
184 queueRequest(request);
185 return;
186 }
187
8967c8c0
BH
188 /*
189 * For the first background request in the request pending queue
190 * a timer will be started to allow other background requests to
191 * coalesce.
192 */
193 boolean startTimer = (getNbPendingBackgroundRequests() == 0);
194 coalesceEventRequest(request);
195 if (startTimer) {
196 TimerTask task = new TimerTask() {
197 @Override
198 public void run() {
199 synchronized (fLock) {
200 fIsTimeout = true;
201 fireRequest();
202 }
203 }
204 };
205 fTimer.schedule(task, DELAY);
fd3f1eff
AM
206 }
207 }
208 }
209
f45257df 210 private void fireRequest() {
fd3f1eff
AM
211 synchronized (fLock) {
212 if (fRequestPendingCounter > 0) {
213 return;
214 }
8967c8c0 215
fd3f1eff 216 if (fPendingCoalescedRequests.size() > 0) {
8967c8c0
BH
217 Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
218 while (iter.hasNext()) {
219 ExecutionType type = (fIsTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
220 ITmfEventRequest request = iter.next();
221 if (type == request.getExecType()) {
f45257df 222 queueRequest(request);
8967c8c0
BH
223 iter.remove();
224 }
fd3f1eff 225 }
fd3f1eff 226 }
5419a136 227 }
5419a136
AM
228 }
229
fd3f1eff
AM
230 /**
231 * Increments/decrements the pending requests counters and fires the request
232 * if necessary (counter == 0). Used for coalescing requests across multiple
233 * TmfDataProvider's.
234 *
235 * @param isIncrement
236 * Should we increment (true) or decrement (false) the pending
237 * counter
238 */
5419a136 239 @Override
fd3f1eff
AM
240 public void notifyPendingRequest(boolean isIncrement) {
241 synchronized (fLock) {
242 if (isIncrement) {
8967c8c0 243 fRequestPendingCounter++;
fd3f1eff
AM
244 } else {
245 if (fRequestPendingCounter > 0) {
246 fRequestPendingCounter--;
247 }
248
249 // fire request if all pending requests are received
250 if (fRequestPendingCounter == 0) {
251 fireRequest();
252 }
253 }
254 }
255 }
256
257 // ------------------------------------------------------------------------
258 // Coalescing
259 // ------------------------------------------------------------------------
260
261 /**
262 * Create a new request from an existing one, and add it to the coalesced
263 * requests
264 *
265 * @param request
266 * The request to copy
c4767854 267 * @since 3.0
fd3f1eff 268 */
8967c8c0
BH
269 protected void newCoalescedEventRequest(ITmfEventRequest request) {
270 synchronized (fLock) {
672a642a 271 TmfCoalescedEventRequest coalescedRequest = new TmfCoalescedEventRequest(
fd3f1eff
AM
272 request.getDataType(),
273 request.getRange(),
274 request.getIndex(),
275 request.getNbRequested(),
276 request.getExecType());
277 coalescedRequest.addRequest(request);
5419a136 278 if (TmfCoreTracer.isRequestTraced()) {
8b56808c
GB
279 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
280 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
5419a136
AM
281 }
282 fPendingCoalescedRequests.add(coalescedRequest);
8967c8c0 283 }
fd3f1eff
AM
284 }
285
286 /**
287 * Add an existing requests to the list of coalesced ones
288 *
289 * @param request
290 * The request to add to the list
c4767854 291 * @since 3.0
fd3f1eff
AM
292 */
293 protected void coalesceEventRequest(ITmfEventRequest request) {
294 synchronized (fLock) {
295 for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) {
296 if (coalescedRequest.isCompatible(request)) {
297 coalescedRequest.addRequest(request);
298 if (TmfCoreTracer.isRequestTraced()) {
8b56808c
GB
299 TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
300 TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
fd3f1eff
AM
301 }
302 return;
303 }
304 }
305 newCoalescedEventRequest(request);
306 }
307 }
308
8967c8c0
BH
309 /**
310 * Gets the number of background requests in pending queue.
311 *
312 * @return the number of background requests in pending queue
313 */
314 private int getNbPendingBackgroundRequests() {
315 int nbBackgroundRequests = 0;
316 synchronized (fLock) {
317 for (ITmfEventRequest request : fPendingCoalescedRequests) {
318 if (request.getExecType() == ExecutionType.BACKGROUND) {
319 nbBackgroundRequests++;
320 }
321 }
322 }
323 return nbBackgroundRequests;
324 }
325
fd3f1eff
AM
326 // ------------------------------------------------------------------------
327 // Request processing
328 // ------------------------------------------------------------------------
329
fd3f1eff
AM
330 /**
331 * Queue a request.
332 *
333 * @param request
334 * The data request
c4767854 335 * @since 3.0
fd3f1eff
AM
336 */
337 protected void queueRequest(final ITmfEventRequest request) {
338
339 if (fExecutor.isShutdown()) {
340 request.cancel();
341 return;
342 }
343
344 TmfEventThread thread = new TmfEventThread(this, request);
345
346 if (TmfCoreTracer.isRequestTraced()) {
8b56808c 347 TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
fd3f1eff
AM
348 }
349
350 fExecutor.execute(thread);
351 }
352
fd3f1eff
AM
353 /**
354 * Initialize the provider based on the request. The context is provider
355 * specific and will be updated by getNext().
356 *
357 * @param request
358 * The request
359 * @return An application specific context; null if request can't be
360 * serviced
c4767854 361 * @since 3.0
fd3f1eff
AM
362 */
363 public abstract ITmfContext armRequest(ITmfEventRequest request);
364
365 /**
366 * Checks if the data meets the request completion criteria.
367 *
368 * @param request
369 * The request
370 * @param event
371 * The data to verify
372 * @param nbRead
373 * The number of events read so far
374 * @return true if completion criteria is met
c4767854 375 * @since 3.0
fd3f1eff
AM
376 */
377 public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
378 boolean requestCompleted = isCompleted2(request, nbRead);
379 if (!requestCompleted) {
380 ITmfTimestamp endTime = request.getRange().getEndTime();
065cc19b 381 return event.getTimestamp().compareTo(endTime) > 0;
fd3f1eff
AM
382 }
383 return requestCompleted;
384 }
385
386 private static boolean isCompleted2(ITmfEventRequest request,int nbRead) {
387 return request.isCompleted() || nbRead >= request.getNbRequested();
388 }
389
390 // ------------------------------------------------------------------------
391 // Pass-through's to the request executor
392 // ------------------------------------------------------------------------
393
394 /**
395 * @return the shutdown state (i.e. if it is accepting new requests)
396 * @since 2.0
397 */
398 protected boolean executorIsShutdown() {
399 return fExecutor.isShutdown();
400 }
401
402 /**
403 * @return the termination state
404 * @since 2.0
405 */
406 protected boolean executorIsTerminated() {
407 return fExecutor.isTerminated();
408 }
409
410 // ------------------------------------------------------------------------
411 // Signal handlers
412 // ------------------------------------------------------------------------
413
414 /**
415 * Handler for the start synch signal
416 *
417 * @param signal
418 * Incoming signal
419 */
420 @TmfSignalHandler
421 public void startSynch(TmfStartSynchSignal signal) {
422 synchronized (fLock) {
423 fSignalDepth++;
424 }
425 }
426
427 /**
428 * Handler for the end synch signal
429 *
430 * @param signal
431 * Incoming signal
432 */
433 @TmfSignalHandler
434 public void endSynch(TmfEndSynchSignal signal) {
435 synchronized (fLock) {
436 fSignalDepth--;
437 if (fSignalDepth == 0) {
8967c8c0 438 fIsTimeout = false;
fd3f1eff
AM
439 fireRequest();
440 }
5419a136
AM
441 }
442 }
951d134a 443
8c8bf09f 444}
This page took 0.091777 seconds and 5 git commands to generate.