2010-09-17 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug325662
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.tmf.Tracer;
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
61
62 public static final int DEFAULT_BLOCK_SIZE = 5000;
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
70 private final Object fLock = new Object();
71
72 // ------------------------------------------------------------------------
73 // Constructors
74 // ------------------------------------------------------------------------
75
76 public TmfDataProvider(String name, Class<T> type) {
77 this(name, type, DEFAULT_QUEUE_SIZE);
78 }
79
80 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
81 super(name);
82 fType = type;
83 fQueueSize = queueSize;
84 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
85
86 fExecutor = new TmfRequestExecutor();
87 fSignalDepth = 0;
88
89 fLogData = Tracer.isEventTraced();
90 fLogError = Tracer.isErrorTraced();
91
92 TmfProviderManager.register(fType, this);
93 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
94 }
95
96 public TmfDataProvider(TmfDataProvider<T> other) {
97 super(other);
98 fType = other.fType;
99 fQueueSize = other.fQueueSize;
100 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
101
102 fExecutor = new TmfRequestExecutor();
103 fSignalDepth = 0;
104
105 fLogData = Tracer.isEventTraced();
106 fLogError = Tracer.isErrorTraced();
107 }
108
109 @Override
110 public void dispose() {
111 TmfProviderManager.deregister(fType, this);
112 fExecutor.stop();
113 super.dispose();
114 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
115 }
116
117 public int getQueueSize() {
118 return fQueueSize;
119 }
120
121 public Class<?> getType() {
122 return fType;
123 }
124
125 // ------------------------------------------------------------------------
126 // ITmfRequestHandler
127 // ------------------------------------------------------------------------
128
129 public void sendRequest(final ITmfDataRequest<T> request) {
130 synchronized(fLock) {
131 if (fSignalDepth > 0) {
132 coalesceDataRequest(request);
133 } else {
134 dispatchRequest(request);
135 }
136 }
137 }
138
139 /**
140 * This method queues the coalesced requests.
141 *
142 * @param thread
143 */
144 public void fireRequests() {
145 synchronized(fLock) {
146 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
147 dispatchRequest(request);
148 }
149 fPendingCoalescedRequests.clear();
150 }
151 }
152
153 // ------------------------------------------------------------------------
154 // Coalescing (primitive test...)
155 // ------------------------------------------------------------------------
156
157 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
158
159 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
160 synchronized(fLock) {
161 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(
162 fType, request.getIndex(), request.getNbRequested(),request.getExecType());
163 coalescedRequest.addRequest(request);
164 if (Tracer.isRequestTraced()) {
165 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
166 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
167 }
168 fPendingCoalescedRequests.add(coalescedRequest);
169 }
170 }
171
172 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
173 synchronized(fLock) {
174 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
175 if (req.isCompatible(request)) {
176 req.addRequest(request);
177 if (Tracer.isRequestTraced()) {
178 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
179 Tracer.traceRequest(req, "added " + request.getRequestId());
180 }
181 return;
182 }
183 }
184 newCoalescedDataRequest(request);
185 }
186 }
187
188 // ------------------------------------------------------------------------
189 // Request processing
190 // ------------------------------------------------------------------------
191
192 private void dispatchRequest(final ITmfDataRequest<T> request) {
193 if (request.getExecType() == ExecutionType.FOREGROUND)
194 queueRequest(request);
195 else
196 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
197 }
198
199 protected void queueRequest(final ITmfDataRequest<T> request) {
200
201 if (fExecutor.isShutdown()) {
202 request.cancel();
203 return;
204 }
205
206 final TmfDataProvider<T> provider = this;
207
208 // Process the request
209 TmfThread thread = new TmfThread(request.getExecType()) {
210
211 @Override
212 public void run() {
213
214 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
215
216 // Extract the generic information
217 request.start();
218 int nbRequested = request.getNbRequested();
219 int nbRead = 0;
220
221 // Initialize the execution
222 ITmfContext context = armRequest(request);
223 if (context == null) {
224 request.cancel();
225 return;
226 }
227
228 try {
229 // Get the ordered events
230 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
231 T data = getNext(context);
232 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
233 while (data != null && !isCompleted(request, data, nbRead))
234 {
235 if (fLogData) Tracer.traceEvent(provider, request, data);
236 request.handleData(data);
237
238 // To avoid an unnecessary read passed the last data requested
239 if (++nbRead < nbRequested) {
240 data = getNext(context);
241 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
242 Tracer.trace("Request #" + request.getRequestId() + " end of data");
243 }
244 }
245 }
246 request.done();
247
248 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
249 }
250 catch (Exception e) {
251 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
252 request.fail();
253 }
254 }
255 };
256
257 fExecutor.execute(thread);
258
259 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
260 }
261
262 // By default, same behavior as a foreground request
263 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
264 queueRequest(request);
265 }
266
267 /**
268 * Initialize the provider based on the request. The context is
269 * provider specific and will be updated by getNext().
270 *
271 * @param request
272 * @return an application specific context; null if request can't be serviced
273 */
274 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
275 public abstract T getNext(ITmfContext context);
276
277 /**
278 * Checks if the data meets the request completion criteria.
279 *
280 * @param request
281 * @param data
282 * @return
283 */
284 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
285 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
286 }
287
288 // ------------------------------------------------------------------------
289 // Signal handlers
290 // ------------------------------------------------------------------------
291
292 @TmfSignalHandler
293 public void startSynch(TmfStartSynchSignal signal) {
294 synchronized (fLock) {
295 fSignalDepth++;
296 }
297 }
298
299 @TmfSignalHandler
300 public void endSynch(TmfEndSynchSignal signal) {
301 synchronized (fLock) {
302 fSignalDepth--;
303 if (fSignalDepth == 0) {
304 fireRequests();
305 }
306 }
307 }
308
309 }
This page took 0.037589 seconds and 5 git commands to generate.