tmf: fix sonar warnings about redundant modifier in interfaces
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2013 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Francois Chouinard - Replace background requests by pre-emptable requests
12 *******************************************************************************/
13
14 package org.eclipse.linuxtools.tmf.core.component;
15
16 import java.util.Vector;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.SynchronousQueue;
20
21 import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
22 import org.eclipse.linuxtools.internal.tmf.core.component.TmfEventThread;
23 import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
24 import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
28 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
29 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
30 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
31 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
32 import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
33 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
34
35 /**
36 * An abstract base class that implements ITmfDataProvider.
37 * <p>
38 * This abstract class implements the housekeeping methods to register/
39 * de-register the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or just
42 * implement the hooks (initializeContext() and getNext()).
43 * <p>
44 *
45 * @author Francois Chouinard
46 * @version 1.1
47 */
48 public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider {
49
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
53
54 /** Default amount of events per request "chunk" */
55 public static final int DEFAULT_BLOCK_SIZE = 50000;
56
57 /** Default size of the queue */
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
60 // ------------------------------------------------------------------------
61 // Attributes
62 // ------------------------------------------------------------------------
63
64 /** The type of event handled by this provider */
65 protected Class<? extends ITmfEvent> fType;
66
67 /** Is there some data being logged? */
68 protected boolean fLogData;
69
70 /** Are errors being logged? */
71 protected boolean fLogError;
72
73 /** Queue of events */
74 protected BlockingQueue<ITmfEvent> fDataQueue;
75
76 /** Size of the fDataQueue */
77 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
78
79 private TmfRequestExecutor fExecutor;
80
81 private int fSignalDepth = 0;
82 private final Object fLock = new Object();
83
84 private int fRequestPendingCounter = 0;
85
86 // ------------------------------------------------------------------------
87 // Constructors
88 // ------------------------------------------------------------------------
89
90 /**
91 * Default constructor
92 */
93 public TmfDataProvider() {
94 super();
95 fQueueSize = DEFAULT_QUEUE_SIZE;
96 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
97 fExecutor = new TmfRequestExecutor();
98 }
99
100 /**
101 * Initialize this data provider
102 *
103 * @param name
104 * Name of the provider
105 * @param type
106 * The type of events that will be handled
107 */
108 public void init(String name, Class<? extends ITmfEvent> type) {
109 super.init(name);
110 fType = type;
111 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
112
113 fExecutor = new TmfRequestExecutor();
114 fSignalDepth = 0;
115
116 fLogData = TmfCoreTracer.isEventTraced();
117 // fLogError = TmfCoreTracer.isErrorTraced();
118
119 TmfProviderManager.register(fType, this);
120 }
121
122 /**
123 * Constructor specifying the event type and the queue size.
124 *
125 * @param name
126 * Name of the provider
127 * @param type
128 * Type of event that will be handled
129 * @param queueSize
130 * Size of the event queue
131 */
132 protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
133 this();
134 fQueueSize = queueSize;
135 init(name, type);
136 }
137
138 /**
139 * Copy constructor
140 *
141 * @param other
142 * The other object to copy
143 */
144 public TmfDataProvider(TmfDataProvider other) {
145 this();
146 init(other.getName(), other.fType);
147 }
148
149 /**
150 * Standard constructor. Instantiate and initialize at the same time.
151 *
152 * @param name
153 * Name of the provider
154 * @param type
155 * The type of events that will be handled
156 */
157 public TmfDataProvider(String name, Class<? extends ITmfEvent> type) {
158 this(name, type, DEFAULT_QUEUE_SIZE);
159 }
160
161 @Override
162 public void dispose() {
163 TmfProviderManager.deregister(fType, this);
164 fExecutor.stop();
165 super.dispose();
166 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
167 }
168
169 // ------------------------------------------------------------------------
170 // Accessors
171 // ------------------------------------------------------------------------
172
173 /**
174 * Get the queue size of this provider
175 *
176 * @return The size of the queue
177 */
178 public int getQueueSize() {
179 return fQueueSize;
180 }
181
182 /**
183 * Get the event type this provider handles
184 *
185 * @return The type of ITmfEvent
186 */
187 public Class<?> getType() {
188 return fType;
189 }
190
191 // ------------------------------------------------------------------------
192 // ITmfRequestHandler
193 // ------------------------------------------------------------------------
194
195 @Override
196 public void sendRequest(final ITmfDataRequest request) {
197 synchronized (fLock) {
198 if (fSignalDepth > 0) {
199 coalesceDataRequest(request);
200 } else {
201 dispatchRequest(request);
202 }
203 }
204 }
205
206 @Override
207 public void fireRequest() {
208 synchronized (fLock) {
209 if (fRequestPendingCounter > 0) {
210 return;
211 }
212 if (fPendingCoalescedRequests.size() > 0) {
213 for (TmfDataRequest request : fPendingCoalescedRequests) {
214 dispatchRequest(request);
215 }
216 fPendingCoalescedRequests.clear();
217 }
218 }
219 }
220
221 /**
222 * Increments/decrements the pending requests counters and fires the request
223 * if necessary (counter == 0). Used for coalescing requests across multiple
224 * TmfDataProvider's.
225 *
226 * @param isIncrement
227 * Should we increment (true) or decrement (false) the pending
228 * counter
229 */
230 @Override
231 public void notifyPendingRequest(boolean isIncrement) {
232 synchronized (fLock) {
233 if (isIncrement) {
234 if (fSignalDepth > 0) {
235 fRequestPendingCounter++;
236 }
237 } else {
238 if (fRequestPendingCounter > 0) {
239 fRequestPendingCounter--;
240 }
241
242 // fire request if all pending requests are received
243 if (fRequestPendingCounter == 0) {
244 fireRequest();
245 }
246 }
247 }
248 }
249
250 // ------------------------------------------------------------------------
251 // Coalescing (primitive test...)
252 // ------------------------------------------------------------------------
253
254 /** List of coalesced requests */
255 protected Vector<TmfCoalescedDataRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest>();
256
257 /**
258 * Create a new request from an existing one, and add it to the coalesced
259 * requests
260 *
261 * @param request
262 * The request to copy
263 */
264 protected void newCoalescedDataRequest(ITmfDataRequest request) {
265 synchronized (fLock) {
266 TmfCoalescedDataRequest coalescedRequest = new TmfCoalescedDataRequest(request.getDataType(), request.getIndex(),
267 request.getNbRequested(), request.getBlockSize(), request.getExecType());
268 coalescedRequest.addRequest(request);
269 if (TmfCoreTracer.isRequestTraced()) {
270 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
271 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
272 }
273 fPendingCoalescedRequests.add(coalescedRequest);
274 }
275 }
276
277 /**
278 * Add an existing requests to the list of coalesced ones
279 *
280 * @param request
281 * The request to add to the list
282 */
283 protected void coalesceDataRequest(ITmfDataRequest request) {
284 synchronized (fLock) {
285 for (TmfCoalescedDataRequest coalescedRequest : fPendingCoalescedRequests) {
286 if (coalescedRequest.isCompatible(request)) {
287 coalescedRequest.addRequest(request);
288 if (TmfCoreTracer.isRequestTraced()) {
289 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
290 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
291 }
292 return;
293 }
294 }
295 newCoalescedDataRequest(request);
296 }
297 }
298
299 // ------------------------------------------------------------------------
300 // Request processing
301 // ------------------------------------------------------------------------
302
303 private void dispatchRequest(final ITmfDataRequest request) {
304 if (request.getExecType() == ExecutionType.FOREGROUND) {
305 queueRequest(request);
306 } else {
307 queueBackgroundRequest(request, request.getBlockSize(), true);
308 }
309 }
310
311 /**
312 * Queue a request.
313 *
314 * @param request
315 * The data request
316 */
317 protected void queueRequest(final ITmfDataRequest request) {
318
319 if (fExecutor.isShutdown()) {
320 request.cancel();
321 return;
322 }
323
324 TmfEventThread thread = new TmfEventThread(this, request);
325
326 if (TmfCoreTracer.isRequestTraced()) {
327 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
328 }
329
330 fExecutor.execute(thread);
331 }
332
333 /**
334 * Queue a background request
335 *
336 * @param request
337 * The request
338 * @param blockSize
339 * The request should be split in chunks of this size
340 * @param indexing
341 * Should we index the chunks
342 */
343 protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) {
344 queueRequest(request);
345 }
346
347 /**
348 * Initialize the provider based on the request. The context is provider
349 * specific and will be updated by getNext().
350 *
351 * @param request
352 * The request
353 * @return An application specific context; null if request can't be
354 * serviced
355 * @since 2.0
356 */
357 public abstract ITmfContext armRequest(ITmfDataRequest request);
358
359 // /**
360 // * Return the next event based on the context supplied. The context
361 // * will be updated for the subsequent read.
362 // *
363 // * @param context the trace read context (updated)
364 // * @return the event referred to by context
365 // */
366 // public abstract T getNext(ITmfContext context);
367
368 /**
369 * Checks if the data meets the request completion criteria.
370 *
371 * @param request the request
372 * @param data the data to verify
373 * @param nbRead the number of events read so far
374 * @return true if completion criteria is met
375 */
376 public boolean isCompleted(ITmfDataRequest request, ITmfEvent data, int nbRead) {
377 return request.isCompleted() || nbRead >= request.getNbRequested();
378 }
379
380 // ------------------------------------------------------------------------
381 // Pass-through's to the request executor
382 // ------------------------------------------------------------------------
383
384 /**
385 * @return the shutdown state (i.e. if it is accepting new requests)
386 * @since 2.0
387 */
388 protected boolean executorIsShutdown() {
389 return fExecutor.isShutdown();
390 }
391
392 /**
393 * @return the termination state
394 * @since 2.0
395 */
396 protected boolean executorIsTerminated() {
397 return fExecutor.isTerminated();
398 }
399
400 // ------------------------------------------------------------------------
401 // Signal handlers
402 // ------------------------------------------------------------------------
403
404 /**
405 * Handler for the start synch signal
406 *
407 * @param signal
408 * Incoming signal
409 */
410 @TmfSignalHandler
411 public void startSynch(TmfStartSynchSignal signal) {
412 synchronized (fLock) {
413 fSignalDepth++;
414 }
415 }
416
417 /**
418 * Handler for the end synch signal
419 *
420 * @param signal
421 * Incoming signal
422 */
423 @TmfSignalHandler
424 public void endSynch(TmfEndSynchSignal signal) {
425 synchronized (fLock) {
426 fSignalDepth--;
427 if (fSignalDepth == 0) {
428 fireRequest();
429 }
430 }
431 }
432
433 }
This page took 0.040257 seconds and 5 git commands to generate.