Monster merge from the integration branch. Still some problems left and JUnits failing.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
951d134a 19import java.util.concurrent.SynchronousQueue;
550d787e 20import java.util.concurrent.TimeUnit;
8c8bf09f 21
ce785d7d 22import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 23import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
24import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
26import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
27import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
28import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
29import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
31import org.eclipse.linuxtools.tmf.trace.ITmfContext;
32
33/**
34 * <b><u>TmfProvider</u></b>
35 * <p>
36 * The TmfProvider<T> is a provider for a data of type <T>.
37 * <p>
38 * This abstract class implements the housekeeking methods to register/
39 * deregister the event provider and to handle generically the event requests.
40 * <p>
41 * The concrete class can either re-implement processRequest() entirely or
42 * just implement the hooks (initializeContext() and getNext()).
43 * <p>
44 * TODO: Add support for providing multiple data types.
45 */
951d134a 46public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 47
550d787e
FC
48 // ------------------------------------------------------------------------
49 // Constants
50 // ------------------------------------------------------------------------
51
52 private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
53// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54
55 // ------------------------------------------------------------------------
56 //
57 // ------------------------------------------------------------------------
58
8c8bf09f 59 final protected Class<T> fType;
550d787e
FC
60 final protected boolean fLogData;
61 final protected boolean fLogException;
8c8bf09f
ASL
62
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64 protected final int fQueueSize;
65 protected final BlockingQueue<T> fDataQueue;
66 protected final TmfRequestExecutor fExecutor;
67
550d787e 68 private int fSignalDepth = 0;
951d134a 69
8c8bf09f 70 // ------------------------------------------------------------------------
951d134a 71 // Constructors
8c8bf09f
ASL
72 // ------------------------------------------------------------------------
73
74 public TmfDataProvider(String name, Class<T> type) {
75 this(name, type, DEFAULT_QUEUE_SIZE);
76 }
77
78 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
79 super(name);
fc6ccf6f 80 fType = type;
ce785d7d 81 fQueueSize = queueSize;
951d134a
FC
82 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
83
550d787e 84 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "created");
ce785d7d
FC
85
86 fExecutor = new TmfRequestExecutor();
550d787e
FC
87 fSignalDepth = 0;
88
89 fLogData = Tracer.isEventTraced();
90 fLogException = Tracer.isEventTraced();
54d55ced
FC
91
92 TmfProviderManager.register(fType, this);
550d787e 93 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 94}
377f1ad8 95
ce785d7d
FC
96 public TmfDataProvider(TmfDataProvider<T> other) {
97 super(other);
98 fType = other.fType;
99 fQueueSize = other.fQueueSize;
100 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
101
102 fExecutor = new TmfRequestExecutor();
550d787e
FC
103 fSignalDepth = 0;
104
105 fLogData = Tracer.isEventTraced();
106 fLogException = Tracer.isEventTraced();
377f1ad8
WB
107 }
108
fc6ccf6f 109 @Override
2fb2eb37 110 public void dispose() {
8c8bf09f 111 TmfProviderManager.deregister(fType, this);
54d55ced 112 fExecutor.stop();
550d787e
FC
113
114 if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
115
116 if (fClone != null) fClone.dispose();
2fb2eb37 117 super.dispose();
8c8bf09f
ASL
118 }
119
120 public int getQueueSize() {
121 return fQueueSize;
122 }
123
ff4ed569
FC
124 public Class<?> getType() {
125 return fType;
126 }
127
8c8bf09f
ASL
128 // ------------------------------------------------------------------------
129 // ITmfRequestHandler
130 // ------------------------------------------------------------------------
131
550d787e
FC
132 protected TmfDataProvider<T> fClone;
133 public void sendRequest(final ITmfDataRequest<T> request) {
134 synchronized(this) {
135 if (fClone == null || request.getExecType() == SHORT) {
136 if (fSignalDepth > 0) {
137 coalesceDataRequest(request);
138 } else {
139 queueRequest(request);
140 }
141 }
142 else {
143 fClone.sendRequest(request);
144 }
951d134a
FC
145 }
146 }
147
148 /**
149 * This method queues the coalesced requests.
150 *
151 * @param thread
152 */
550d787e
FC
153 public void fireRequests() {
154 synchronized(this) {
155 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
156 queueRequest(request);
157 }
158 fPendingCoalescedRequests.clear();
159
160 if (fClone != null)
161 fClone.fireRequests();
951d134a 162 }
951d134a
FC
163 }
164
165 // ------------------------------------------------------------------------
166 // Coalescing (primitive test...)
167 // ------------------------------------------------------------------------
8c8bf09f 168
951d134a 169 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 170
550d787e
FC
171 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
172 synchronized(this) {
173 TmfCoalescedDataRequest<T> coalescedRequest =
174 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
175 coalescedRequest.addRequest(request);
176 fPendingCoalescedRequests.add(coalescedRequest);
177 }
951d134a 178 }
8c8bf09f 179
2fb2eb37 180 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
550d787e
FC
181 synchronized(this) {
182 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
183 if (req.isCompatible(request)) {
184 req.addRequest(request);
185 return;
186 }
951d134a 187 }
550d787e 188 newCoalescedDataRequest(request);
8c8bf09f 189 }
8c8bf09f
ASL
190 }
191
951d134a
FC
192 // ------------------------------------------------------------------------
193 // Request processing
194 // ------------------------------------------------------------------------
195
2fb2eb37 196 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 197
550d787e
FC
198 final ITmfDataProvider<T> provider = this;
199 final ITmfComponent component = this;
200
8c8bf09f
ASL
201 // Process the request
202 Thread thread = new Thread() {
203
204 @Override
205 public void run() {
206
207 // Extract the generic information
550d787e 208 request.start();
8c8bf09f
ASL
209 int blockSize = request.getBlockize();
210 int nbRequested = request.getNbRequested();
211
212 // Create the result buffer
213 Vector<T> result = new Vector<T>();
214 int nbRead = 0;
215
216 // Initialize the execution
217 ITmfContext context = armRequest(request);
218 if (context == null) {
550d787e 219 request.cancel();
8c8bf09f
ASL
220 return;
221 }
222
550d787e
FC
223 try {
224 // Get the ordered events
225 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
226 T data = getNext(context);
227 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
228 while (data != null && !isCompleted(request, data, nbRead))
229 {
230 if (fLogData) Tracer.traceEvent(provider, request, data);
231 result.add(data);
232 if (++nbRead % blockSize == 0) {
233 pushData(request, result);
ce785d7d 234 }
550d787e
FC
235 // To avoid an unnecessary read passed the last data requested
236 if (nbRead < nbRequested) {
237 data = getNext(context);
238 if (data == null || data.isNullRef()) {
239 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
240 }
241 }
242 }
243 if (result.size() > 0) {
244 pushData(request, result);
7f407ead 245 }
550d787e
FC
246 request.done();
247 }
248 catch (Exception e) {
249 e.printStackTrace();
250 if (fLogException) Tracer.traceException(e);
251 request.fail();
8c8bf09f 252 }
8c8bf09f
ASL
253 }
254 };
5c00c0b7 255 fExecutor.execute(thread);
550d787e 256 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
257 }
258
259 /**
260 * Format the result data and forwards it to the requester.
261 * Note: after handling, the data is *removed*.
262 *
263 * @param request
264 * @param data
265 */
266 @SuppressWarnings("unchecked")
951d134a 267 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
268 synchronized(request) {
269 if (!request.isCompleted()) {
270 T[] result = (T[]) Array.newInstance(fType, data.size());
271 data.toArray(result);
272 request.setData(result);
273 request.handleData();
274 data.removeAllElements();
275 }
276 }
277 }
278
951d134a
FC
279 /**
280 * Initialize the provider based on the request. The context is
281 * provider specific and will be updated by getNext().
282 *
283 * @param request
284 * @return an application specific context; null if request can't be serviced
285 */
2fb2eb37 286 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
951d134a 287
8c8bf09f
ASL
288 /**
289 * Return the next piece of data based on the context supplied. The context
290 * would typically be updated for the subsequent read.
291 *
292 * @param context
293 * @return
294 */
550d787e
FC
295 private static final int TIMEOUT = 1000;
296 public T getNext(ITmfContext context) throws InterruptedException {
297 T event = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
298 if (event == null) {
299 if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on read");
300 System.out.println(getName() + ": Request timeout on read");
301 throw new InterruptedException();
8c8bf09f 302 }
550d787e 303 return event;
8c8bf09f
ASL
304 }
305
951d134a
FC
306 /**
307 * Makes the generated result data available for getNext()
308 *
309 * @param data
310 */
550d787e
FC
311 public void queueResult(T data) throws InterruptedException {
312 boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
313 if (!ok) {
314 if (Tracer.isErrorTraced()) Tracer.traceError("Request timeout on write");
315 System.out.println(getName() + ": Request timeout on write");
316 throw new InterruptedException();
8c8bf09f
ASL
317 }
318 }
319
320 /**
321 * Checks if the data meets the request completion criteria.
322 *
323 * @param request
324 * @param data
325 * @return
326 */
2fb2eb37 327 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 328 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
329 }
330
951d134a
FC
331 // ------------------------------------------------------------------------
332 // Signal handlers
333 // ------------------------------------------------------------------------
334
8c8bf09f 335 @TmfSignalHandler
550d787e
FC
336 public synchronized void startSynch(TmfStartSynchSignal signal) {
337 fSignalDepth++;
8c8bf09f
ASL
338 }
339
340 @TmfSignalHandler
550d787e
FC
341 public synchronized void endSynch(TmfEndSynchSignal signal) {
342 fSignalDepth--;
343 if (fSignalDepth == 0) {
344 fireRequests();
951d134a 345 }
8c8bf09f
ASL
346 }
347
348}
This page took 0.044401 seconds and 5 git commands to generate.