2010-06-05 fchouinard@gmail.com Contributions for bugs 292965, 292963, 293102,...
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.lang.reflect.Array;
16 import java.util.Vector;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.TimeUnit;
20
21 import org.eclipse.linuxtools.tmf.Tracer;
22 import org.eclipse.linuxtools.tmf.event.TmfData;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
61
62 public static final int DEFAULT_QUEUE_SIZE = 1000;
63 protected final int fQueueSize;
64 protected final BlockingQueue<T> fDataQueue;
65 protected final TmfRequestExecutor fExecutor;
66
67 private int fSignalDepth = 0;
68
69 // ------------------------------------------------------------------------
70 // Constructors
71 // ------------------------------------------------------------------------
72
73 public TmfDataProvider(String name, Class<T> type) {
74 this(name, type, DEFAULT_QUEUE_SIZE);
75 }
76
77 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
78 super(name);
79 fType = type;
80 fQueueSize = queueSize;
81 // fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
82 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
83
84 fExecutor = new TmfRequestExecutor();
85 fSignalDepth = 0;
86
87 fLogData = Tracer.isEventTraced();
88 fLogError = Tracer.isErrorTraced();
89
90 TmfProviderManager.register(fType, this);
91 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
92 }
93
94 public TmfDataProvider(TmfDataProvider<T> other) {
95 super(other);
96 fType = other.fType;
97 fQueueSize = other.fQueueSize;
98 // fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
99 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
100
101 fExecutor = new TmfRequestExecutor();
102 fSignalDepth = 0;
103
104 fLogData = Tracer.isEventTraced();
105 fLogError = Tracer.isErrorTraced();
106 }
107
108 @Override
109 public void dispose() {
110 TmfProviderManager.deregister(fType, this);
111 fExecutor.stop();
112
113 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
114
115 if (fClone != null) fClone.dispose();
116 super.dispose();
117 }
118
119 public int getQueueSize() {
120 return fQueueSize;
121 }
122
123 public Class<?> getType() {
124 return fType;
125 }
126
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
130
131 protected TmfDataProvider<T> fClone;
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized(this) {
134 if (fClone == null || request.getExecType() == SHORT) {
135 if (fSignalDepth > 0) {
136 coalesceDataRequest(request);
137 } else {
138 queueRequest(request);
139 }
140 }
141 else {
142 fClone.sendRequest(request);
143 }
144 }
145 }
146
147 /**
148 * This method queues the coalesced requests.
149 *
150 * @param thread
151 */
152 public void fireRequests() {
153 synchronized(this) {
154 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
155 queueRequest(request);
156 }
157 fPendingCoalescedRequests.clear();
158
159 if (fClone != null)
160 fClone.fireRequests();
161 }
162 }
163
164 // ------------------------------------------------------------------------
165 // Coalescing (primitive test...)
166 // ------------------------------------------------------------------------
167
168 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
169
170 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
171 synchronized(this) {
172 TmfCoalescedDataRequest<T> coalescedRequest =
173 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize(), request.getExecType());
174 coalescedRequest.addRequest(request);
175 fPendingCoalescedRequests.add(coalescedRequest);
176 }
177 }
178
179 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
180 synchronized(this) {
181 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
182 if (req.isCompatible(request)) {
183 req.addRequest(request);
184 return;
185 }
186 }
187 newCoalescedDataRequest(request);
188 }
189 }
190
191 // ------------------------------------------------------------------------
192 // Request processing
193 // ------------------------------------------------------------------------
194
195 protected void queueRequest(final ITmfDataRequest<T> request) {
196
197 final ITmfDataProvider<T> provider = this;
198 final ITmfComponent component = this;
199
200 // Process the request
201 Thread thread = new Thread() {
202
203 @Override
204 public void run() {
205
206 // Extract the generic information
207 request.start();
208 int blockSize = request.getBlockize();
209 int nbRequested = request.getNbRequested();
210
211 // Create the result buffer
212 Vector<T> result = new Vector<T>();
213 int nbRead = 0;
214
215 // Initialize the execution
216 ITmfContext context = armRequest(request);
217 if (context == null) {
218 request.cancel();
219 return;
220 }
221
222 try {
223 // Get the ordered events
224 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
225 T data = getNext(context);
226 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
227 while (data != null && !isCompleted(request, data, nbRead))
228 {
229 if (fLogData) Tracer.traceEvent(provider, request, data);
230 result.add(data);
231 if (++nbRead % blockSize == 0) {
232 pushData(request, result);
233 }
234 // To avoid an unnecessary read passed the last data requested
235 if (nbRead < nbRequested) {
236 data = getNext(context);
237 if (data == null || data.isNullRef()) {
238 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
239 }
240 }
241 }
242 if (result.size() > 0) {
243 pushData(request, result);
244 }
245 request.done();
246 }
247 catch (Exception e) {
248 request.fail();
249 // e.printStackTrace();
250 }
251 }
252 };
253 fExecutor.execute(thread);
254 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
255 }
256
257 /**
258 * Format the result data and forwards it to the requester.
259 * Note: after handling, the data is *removed*.
260 *
261 * @param request
262 * @param data
263 */
264 @SuppressWarnings("unchecked")
265 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
266 synchronized(request) {
267 if (!request.isCompleted()) {
268 T[] result = (T[]) Array.newInstance(fType, data.size());
269 data.toArray(result);
270 request.setData(result);
271 request.handleData();
272 data.removeAllElements();
273 }
274 }
275 }
276
277 /**
278 * Initialize the provider based on the request. The context is
279 * provider specific and will be updated by getNext().
280 *
281 * @param request
282 * @return an application specific context; null if request can't be serviced
283 */
284 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
285
286 /**
287 * Return the next piece of data based on the context supplied. The context
288 * would typically be updated for the subsequent read.
289 *
290 * @param context
291 * @return
292 */
293 private static final int TIMEOUT = 1000;
294 // public abstract T getNext(ITmfContext context) throws InterruptedException;
295 // private int getLevel = 0;
296 public T getNext(ITmfContext context) throws InterruptedException {
297 // String name = Thread.currentThread().getName(); getLevel++;
298 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
299 T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
300 if (data == null) {
301 if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
302 throw new InterruptedException();
303 }
304 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
305 // getLevel--;
306 return data;
307 }
308
309 /**
310 * Makes the generated result data available for getNext()
311 *
312 * @param data
313 */
314 // public abstract void queueResult(T data) throws InterruptedException;
315 // private int putLevel = 0;
316 public void queueResult(T data) throws InterruptedException {
317 // String name = Thread.currentThread().getName(); putLevel++;
318 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
319 boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
320 if (!ok) {
321 if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
322 throw new InterruptedException();
323 }
324 // System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
325 // putLevel--;
326 }
327
328 /**
329 * Checks if the data meets the request completion criteria.
330 *
331 * @param request
332 * @param data
333 * @return
334 */
335 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
336 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
337 }
338
339 // ------------------------------------------------------------------------
340 // Signal handlers
341 // ------------------------------------------------------------------------
342
343 @TmfSignalHandler
344 public synchronized void startSynch(TmfStartSynchSignal signal) {
345 fSignalDepth++;
346 }
347
348 @TmfSignalHandler
349 public synchronized void endSynch(TmfEndSynchSignal signal) {
350 fSignalDepth--;
351 if (fSignalDepth == 0) {
352 fireRequests();
353 }
354 }
355
356 }
This page took 0.038766 seconds and 5 git commands to generate.