2010-10-26 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug309042
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.tmf.Tracer;
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
61
62 public static final int DEFAULT_BLOCK_SIZE = 5000;
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
70 private final Object fLock = new Object();
71
72 // ------------------------------------------------------------------------
73 // Constructors
74 // ------------------------------------------------------------------------
75
76 public TmfDataProvider(String name, Class<T> type) {
77 this(name, type, DEFAULT_QUEUE_SIZE);
78 }
79
80 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
81 super(name);
82 fType = type;
83 fQueueSize = queueSize;
84 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
85
86 fExecutor = new TmfRequestExecutor();
87 fSignalDepth = 0;
88
89 fLogData = Tracer.isEventTraced();
90 fLogError = Tracer.isErrorTraced();
91
92 TmfProviderManager.register(fType, this);
93 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
94 }
95
96 public TmfDataProvider(TmfDataProvider<T> other) {
97 super(other);
98 fType = other.fType;
99 fQueueSize = other.fQueueSize;
100 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
101
102 fExecutor = new TmfRequestExecutor();
103 fSignalDepth = 0;
104
105 fLogData = Tracer.isEventTraced();
106 fLogError = Tracer.isErrorTraced();
107 }
108
109 @Override
110 public void dispose() {
111 TmfProviderManager.deregister(fType, this);
112 fExecutor.stop();
113 super.dispose();
114 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
115 }
116
117 public int getQueueSize() {
118 return fQueueSize;
119 }
120
121 public Class<?> getType() {
122 return fType;
123 }
124
125 // ------------------------------------------------------------------------
126 // ITmfRequestHandler
127 // ------------------------------------------------------------------------
128
129 @Override
130 public void sendRequest(final ITmfDataRequest<T> request) {
131 synchronized(fLock) {
132 if (fSignalDepth > 0) {
133 coalesceDataRequest(request);
134 } else {
135 dispatchRequest(request);
136 }
137 }
138 }
139
140 /**
141 * This method queues the coalesced requests.
142 *
143 * @param thread
144 */
145 @Override
146 public void fireRequests() {
147 synchronized(fLock) {
148 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
149 dispatchRequest(request);
150 }
151 fPendingCoalescedRequests.clear();
152 }
153 }
154
155 // ------------------------------------------------------------------------
156 // Coalescing (primitive test...)
157 // ------------------------------------------------------------------------
158
159 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
160
161 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
162 synchronized(fLock) {
163 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(
164 fType, request.getIndex(), request.getNbRequested(),request.getExecType());
165 coalescedRequest.addRequest(request);
166 if (Tracer.isRequestTraced()) {
167 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
168 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
169 }
170 fPendingCoalescedRequests.add(coalescedRequest);
171 }
172 }
173
174 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
175 synchronized(fLock) {
176 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
177 if (req.isCompatible(request)) {
178 req.addRequest(request);
179 if (Tracer.isRequestTraced()) {
180 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
181 Tracer.traceRequest(req, "added " + request.getRequestId());
182 }
183 return;
184 }
185 }
186 newCoalescedDataRequest(request);
187 }
188 }
189
190 // ------------------------------------------------------------------------
191 // Request processing
192 // ------------------------------------------------------------------------
193
194 private void dispatchRequest(final ITmfDataRequest<T> request) {
195 if (request.getExecType() == ExecutionType.FOREGROUND)
196 queueRequest(request);
197 else
198 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
199 }
200
201 protected void queueRequest(final ITmfDataRequest<T> request) {
202
203 if (fExecutor.isShutdown()) {
204 request.cancel();
205 return;
206 }
207
208 final TmfDataProvider<T> provider = this;
209
210 // Process the request
211 TmfThread thread = new TmfThread(request.getExecType()) {
212
213 @Override
214 public void run() {
215
216 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
217
218 // Extract the generic information
219 request.start();
220 int nbRequested = request.getNbRequested();
221 int nbRead = 0;
222
223 // Initialize the execution
224 ITmfContext context = armRequest(request);
225 if (context == null) {
226 request.cancel();
227 return;
228 }
229
230 try {
231 // Get the ordered events
232 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
233 T data = getNext(context);
234 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
235 while (data != null && !isCompleted(request, data, nbRead))
236 {
237 if (fLogData) Tracer.traceEvent(provider, request, data);
238 request.handleData(data);
239
240 // To avoid an unnecessary read passed the last data requested
241 if (++nbRead < nbRequested) {
242 data = getNext(context);
243 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
244 Tracer.trace("Request #" + request.getRequestId() + " end of data");
245 }
246 }
247 }
248 request.done();
249
250 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
251 }
252 catch (Exception e) {
253 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
254 request.fail();
255 }
256 }
257 };
258
259 fExecutor.execute(thread);
260
261 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
262 }
263
264 // By default, same behavior as a foreground request
265 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
266 queueRequest(request);
267 }
268
269 /**
270 * Initialize the provider based on the request. The context is
271 * provider specific and will be updated by getNext().
272 *
273 * @param request
274 * @return an application specific context; null if request can't be serviced
275 */
276 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
277 public abstract T getNext(ITmfContext context);
278
279 /**
280 * Checks if the data meets the request completion criteria.
281 *
282 * @param request
283 * @param data
284 * @return
285 */
286 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
287 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
288 }
289
290 // ------------------------------------------------------------------------
291 // Signal handlers
292 // ------------------------------------------------------------------------
293
294 @TmfSignalHandler
295 public void startSynch(TmfStartSynchSignal signal) {
296 synchronized (fLock) {
297 fSignalDepth++;
298 }
299 }
300
301 @TmfSignalHandler
302 public void endSynch(TmfEndSynchSignal signal) {
303 synchronized (fLock) {
304 fSignalDepth--;
305 if (fSignalDepth == 0) {
306 fireRequests();
307 }
308 }
309 }
310
311 }
This page took 0.036734 seconds and 5 git commands to generate.