May 31
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
19import java.util.concurrent.SynchronousQueue;
9aae0442 20import java.util.concurrent.TimeUnit;
8c8bf09f
ASL
21
22import org.eclipse.linuxtools.tmf.Tracer;
23import org.eclipse.linuxtools.tmf.event.TmfData;
24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33/**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
46public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
47
48 final protected Class<T> fType;
9aae0442
ASL
49 final protected boolean fLogData;
50 final protected boolean fLogException;
8c8bf09f
ASL
51
52 public static final int DEFAULT_QUEUE_SIZE = 1000;
53 protected final int fQueueSize;
54 protected final BlockingQueue<T> fDataQueue;
55 protected final TmfRequestExecutor fExecutor;
56
9aae0442 57 private int fSignalDepth = 0;
8c8bf09f
ASL
58
59 // ------------------------------------------------------------------------
60 // Constructors
61 // ------------------------------------------------------------------------
62
63 public TmfDataProvider(String name, Class<T> type) {
64 this(name, type, DEFAULT_QUEUE_SIZE);
65 }
66
67 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
68 super(name);
69 fType = type;
70 fQueueSize = queueSize;
71 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
72
9aae0442 73// Tracer.traceComponent(getName() + " created");
8c8bf09f
ASL
74
75 fExecutor = new TmfRequestExecutor();
9aae0442
ASL
76 fSignalDepth = 0;
77
78 fLogData = Tracer.isEventTraced();
79 fLogException = Tracer.isEventTraced();
8c8bf09f
ASL
80
81 TmfProviderManager.register(fType, this);
9aae0442 82// Tracer.traceComponent(getName() + " started");
8c8bf09f
ASL
83}
84
85 public TmfDataProvider(TmfDataProvider<T> other) {
86 super(other);
87 fType = other.fType;
88 fQueueSize = other.fQueueSize;
89 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
90
91 fExecutor = new TmfRequestExecutor();
9aae0442
ASL
92 fSignalDepth = 0;
93
94 fLogData = Tracer.isEventTraced();
95 fLogException = Tracer.isEventTraced();
8c8bf09f
ASL
96 }
97
98 @Override
99 public void dispose() {
100 TmfProviderManager.deregister(fType, this);
101 fExecutor.stop();
9aae0442
ASL
102// Tracer.traceComponent(getName() + " stopped");
103 if (fClone != null) fClone.dispose();
8c8bf09f
ASL
104 super.dispose();
105 }
106
107 public int getQueueSize() {
108 return fQueueSize;
109 }
110
111 public Class<?> getType() {
112 return fType;
113 }
114
115 // ------------------------------------------------------------------------
116 // ITmfRequestHandler
117 // ------------------------------------------------------------------------
118
9aae0442
ASL
119// public synchronized void sendRequest(final ITmfDataRequest<T> request, ExecutionType execType) {
120// sendRequest(request);
121// }
122
8c8bf09f 123 public synchronized void sendRequest(final ITmfDataRequest<T> request) {
9aae0442
ASL
124 sendRequest(request, ExecutionType.SHORT);
125 }
8c8bf09f 126
9aae0442
ASL
127 protected TmfDataProvider<T> fClone;
128 public synchronized void sendRequest(final ITmfDataRequest<T> request, ExecutionType execType) {
129 if (fClone == null || execType == ExecutionType.SHORT) {
130 if (fSignalDepth > 0) {
131 coalesceDataRequest(request);
132 } else {
133 queueRequest(request);
134 }
135 }
136 else {
137 fClone.sendRequest(request);
8c8bf09f
ASL
138 }
139 }
140
141 /**
142 * This method queues the coalesced requests.
143 *
144 * @param thread
145 */
9aae0442 146 public synchronized void fireRequests() {
8c8bf09f
ASL
147 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
148 queueRequest(request);
149 }
150 fPendingCoalescedRequests.clear();
9aae0442
ASL
151
152 if (fClone != null)
153 fClone.fireRequests();
8c8bf09f
ASL
154 }
155
156 // ------------------------------------------------------------------------
157 // Coalescing (primitive test...)
158 // ------------------------------------------------------------------------
159
160 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
161
162 protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
163 TmfCoalescedDataRequest<T> coalescedRequest =
164 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
165 coalescedRequest.addRequest(request);
166 fPendingCoalescedRequests.add(coalescedRequest);
167 }
168
169 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
170 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
171 if (req.isCompatible(request)) {
172 req.addRequest(request);
173 return;
174 }
175 }
176 newCoalescedDataRequest(request);
177 }
178
179 // ------------------------------------------------------------------------
180 // Request processing
181 // ------------------------------------------------------------------------
182
183 protected void queueRequest(final ITmfDataRequest<T> request) {
184
9aae0442
ASL
185// final String provider = getName();
186 final ITmfDataProvider<T> provider = this;
187
8c8bf09f
ASL
188 // Process the request
189 Thread thread = new Thread() {
190
191 @Override
192 public void run() {
193
194 // Extract the generic information
9aae0442 195 request.start();
8c8bf09f
ASL
196 int blockSize = request.getBlockize();
197 int nbRequested = request.getNbRequested();
198
199 // Create the result buffer
200 Vector<T> result = new Vector<T>();
201 int nbRead = 0;
202
203 // Initialize the execution
204 ITmfContext context = armRequest(request);
205 if (context == null) {
9aae0442 206 request.cancel();
8c8bf09f
ASL
207 return;
208 }
209
9aae0442
ASL
210 try {
211 // Get the ordered events
212// Tracer.traceLog("Request #" + request.getRequestId() + " is serviced by " + provider);
213 T data = getNext(context);
214// Tracer.traceLog("Request #" + request.getRequestId() + " read first event");
215 while (data != null && !isCompleted(request, data, nbRead))
216 {
217 if (fLogData) Tracer.traceEvent(provider, request, data);
218 result.add(data);
219 if (++nbRead % blockSize == 0) {
220 pushData(request, result);
221 }
222 // To avoid an unnecessary read passed the last data requested
223 if (nbRead < nbRequested) {
224 data = getNext(context);
225 if (data == null || data.isNullRef()) {
226// Tracer.traceLog("Request #" + request.getRequestId() + " end of data");
227 }
8c8bf09f
ASL
228 }
229 }
9aae0442
ASL
230 pushData(request, result);
231 request.done();
232 }
233 catch (Exception e) {
234 e.printStackTrace();
235 if (fLogException) Tracer.traceException(e);
236 request.fail();
8c8bf09f 237 }
8c8bf09f
ASL
238 }
239 };
240 fExecutor.execute(thread);
9aae0442 241 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
242 }
243
244 /**
245 * Format the result data and forwards it to the requester.
246 * Note: after handling, the data is *removed*.
247 *
248 * @param request
249 * @param data
250 */
251 @SuppressWarnings("unchecked")
252 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
253 synchronized(request) {
254 if (!request.isCompleted()) {
255 T[] result = (T[]) Array.newInstance(fType, data.size());
256 data.toArray(result);
257 request.setData(result);
258 request.handleData();
259 data.removeAllElements();
260 }
261 }
262 }
263
264 /**
265 * Initialize the provider based on the request. The context is
266 * provider specific and will be updated by getNext().
267 *
268 * @param request
269 * @return an application specific context; null if request can't be serviced
270 */
271 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
272
273 /**
274 * Return the next piece of data based on the context supplied. The context
275 * would typically be updated for the subsequent read.
276 *
277 * @param context
278 * @return
279 */
9aae0442
ASL
280 private final int TIMEOUT = 5000;
281 public T getNext(ITmfContext context) throws InterruptedException {
282 T event = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
283 if (event == null) {
284 if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on read");
285 throw new InterruptedException();
8c8bf09f 286 }
9aae0442 287 return event;
8c8bf09f
ASL
288 }
289
290 /**
291 * Makes the generated result data available for getNext()
292 *
293 * @param data
294 */
9aae0442
ASL
295 public void queueResult(T data) throws InterruptedException {
296 boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
297 if (!ok) {
298 if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on write");
299 throw new InterruptedException();
8c8bf09f
ASL
300 }
301 }
302
303 /**
304 * Checks if the data meets the request completion criteria.
305 *
306 * @param request
307 * @param data
308 * @return
309 */
310 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
311 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
312 }
313
314 // ------------------------------------------------------------------------
315 // Signal handlers
316 // ------------------------------------------------------------------------
317
318 @TmfSignalHandler
319 public void startSynch(TmfStartSynchSignal signal) {
320 synchronized(this) {
9aae0442 321 fSignalDepth++;
8c8bf09f
ASL
322 }
323 }
324
325 @TmfSignalHandler
326 public void endSynch(TmfEndSynchSignal signal) {
327 synchronized(this) {
9aae0442
ASL
328 fSignalDepth--;
329 if (fSignalDepth == 0) {
8c8bf09f
ASL
330 fireRequests();
331 }
332 }
333 }
334
335}
This page took 0.038255 seconds and 5 git commands to generate.