Fixes the last (?) warning in LTTng stuff
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
951d134a 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
8c8bf09f 21import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
22import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
24import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
26import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
27import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
28import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
29import org.eclipse.linuxtools.tmf.trace.ITmfContext;
30
31/**
32 * <b><u>TmfProvider</u></b>
33 * <p>
34 * The TmfProvider<T> is a provider for a data of type <T>.
35 * <p>
36 * This abstract class implements the housekeeking methods to register/
37 * deregister the event provider and to handle generically the event requests.
38 * <p>
39 * The concrete class can either re-implement processRequest() entirely or
40 * just implement the hooks (initializeContext() and getNext()).
41 * <p>
42 * TODO: Add support for providing multiple data types.
43 */
951d134a 44public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f
ASL
45
46 final protected Class<T> fType;
47
48 public static final int DEFAULT_QUEUE_SIZE = 1000;
49 protected final int fQueueSize;
50 protected final BlockingQueue<T> fDataQueue;
51 protected final TmfRequestExecutor fExecutor;
52
36548af3 53 private int fCoalescingLevel = 0;
951d134a 54
8c8bf09f 55 // ------------------------------------------------------------------------
951d134a 56 // Constructors
8c8bf09f
ASL
57 // ------------------------------------------------------------------------
58
59 public TmfDataProvider(String name, Class<T> type) {
60 this(name, type, DEFAULT_QUEUE_SIZE);
61 }
62
63 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
64 super(name);
8c8bf09f 65 fQueueSize = queueSize;
fc6ccf6f 66 fType = type;
951d134a
FC
67 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
68
fc6ccf6f 69 fExecutor = new TmfRequestExecutor();
36548af3 70 fCoalescingLevel = 0;
54d55ced
FC
71
72 TmfProviderManager.register(fType, this);
fc6ccf6f 73 }
377f1ad8
WB
74
75 public TmfDataProvider(TmfDataProvider<T> oldDataProvider) {
76 super(oldDataProvider);
77
78 this.fType = oldDataProvider.fType;
79 this.fQueueSize = oldDataProvider.fQueueSize;
80
81 this.fExecutor = new TmfRequestExecutor();
82 this.fDataQueue = (oldDataProvider.fQueueSize > 1) ? new LinkedBlockingQueue<T>(oldDataProvider.fQueueSize) : new SynchronousQueue<T>();
8c8bf09f 83
36548af3 84 this.fCoalescingLevel = 0;
377f1ad8
WB
85 }
86
fc6ccf6f 87 @Override
2fb2eb37 88 public void dispose() {
8c8bf09f 89 TmfProviderManager.deregister(fType, this);
54d55ced 90 fExecutor.stop();
2fb2eb37 91 super.dispose();
8c8bf09f
ASL
92 }
93
94 public int getQueueSize() {
95 return fQueueSize;
96 }
97
ff4ed569
FC
98 public Class<?> getType() {
99 return fType;
100 }
101
8c8bf09f
ASL
102 // ------------------------------------------------------------------------
103 // ITmfRequestHandler
104 // ------------------------------------------------------------------------
105
2fb2eb37 106 public synchronized void sendRequest(final ITmfDataRequest<T> request) {
951d134a 107
36548af3 108 if (fCoalescingLevel > 0) {
951d134a
FC
109 // We are in coalescing mode: client should NEVER wait
110 // (otherwise we will have deadlock...)
111 coalesceDataRequest(request);
112 } else {
113 // Process the request immediately
114 queueRequest(request);
115 }
116 }
117
118 /**
119 * This method queues the coalesced requests.
120 *
121 * @param thread
122 */
123 private synchronized void fireRequests() {
124 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
125 queueRequest(request);
126 }
127 fPendingCoalescedRequests.clear();
128 }
129
130 // ------------------------------------------------------------------------
131 // Coalescing (primitive test...)
132 // ------------------------------------------------------------------------
8c8bf09f 133
951d134a 134 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 135
2fb2eb37 136 protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
951d134a
FC
137 TmfCoalescedDataRequest<T> coalescedRequest =
138 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
139 coalescedRequest.addRequest(request);
140 fPendingCoalescedRequests.add(coalescedRequest);
141 }
8c8bf09f 142
2fb2eb37 143 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
951d134a
FC
144 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
145 if (req.isCompatible(request)) {
146 req.addRequest(request);
147 return;
148 }
8c8bf09f 149 }
951d134a 150 newCoalescedDataRequest(request);
8c8bf09f
ASL
151 }
152
951d134a
FC
153 // ------------------------------------------------------------------------
154 // Request processing
155 // ------------------------------------------------------------------------
156
2fb2eb37 157 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 158
8c8bf09f
ASL
159 // Process the request
160 Thread thread = new Thread() {
161
162 @Override
163 public void run() {
164
165 // Extract the generic information
166 int blockSize = request.getBlockize();
167 int nbRequested = request.getNbRequested();
168
169 // Create the result buffer
170 Vector<T> result = new Vector<T>();
171 int nbRead = 0;
172
173 // Initialize the execution
174 ITmfContext context = armRequest(request);
175 if (context == null) {
fc6ccf6f 176 request.fail();
8c8bf09f
ASL
177 return;
178 }
179
fc6ccf6f 180 // Get the ordered events
36548af3 181// Tracer.trace("Thread #" + Thread.currentThread().getId() + ", request #" + request.getRequestId() + " starting");
fc6ccf6f
FC
182 T data = getNext(context);
183 while (data != null && !isCompleted(request, data, nbRead))
184 {
185 result.add(data);
186 if (++nbRead % blockSize == 0) {
187 pushData(request, result);
8c8bf09f 188 }
fc6ccf6f 189 // To avoid an unnecessary read passed the last data requested
7f407ead 190 if (nbRead < nbRequested) {
fc6ccf6f 191 data = getNext(context);
36548af3
FC
192// if (data != null && data.isNull()) {
193// Tracer.trace("Thread #" + Thread.currentThread().getId() + ", request #" + request.getRequestId() + " end of data");
194// }
7f407ead 195 }
8c8bf09f 196 }
fc6ccf6f
FC
197 pushData(request, result);
198 request.done();
8c8bf09f
ASL
199 }
200 };
5c00c0b7 201 fExecutor.execute(thread);
8c8bf09f
ASL
202 }
203
204 /**
205 * Format the result data and forwards it to the requester.
206 * Note: after handling, the data is *removed*.
207 *
208 * @param request
209 * @param data
210 */
211 @SuppressWarnings("unchecked")
951d134a 212 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
213 synchronized(request) {
214 if (!request.isCompleted()) {
215 T[] result = (T[]) Array.newInstance(fType, data.size());
216 data.toArray(result);
217 request.setData(result);
218 request.handleData();
219 data.removeAllElements();
220 }
221 }
222 }
223
951d134a
FC
224 /**
225 * Initialize the provider based on the request. The context is
226 * provider specific and will be updated by getNext().
227 *
228 * @param request
229 * @return an application specific context; null if request can't be serviced
230 */
2fb2eb37 231 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
951d134a 232
8c8bf09f
ASL
233 /**
234 * Return the next piece of data based on the context supplied. The context
235 * would typically be updated for the subsequent read.
236 *
237 * @param context
238 * @return
239 */
fc6ccf6f
FC
240 public T getNext(ITmfContext context) {
241 try {
242 T event = fDataQueue.take();
243 return event;
244 } catch (InterruptedException e) {
245 e.printStackTrace();
8c8bf09f 246 }
fc6ccf6f 247 return null;
8c8bf09f
ASL
248 }
249
951d134a
FC
250 /**
251 * Makes the generated result data available for getNext()
252 *
253 * @param data
254 */
fc6ccf6f
FC
255 public void queueResult(T data) {
256 try {
257 fDataQueue.put(data);
258 } catch (InterruptedException e1) {
259 e1.printStackTrace();
8c8bf09f
ASL
260 }
261 }
262
263 /**
264 * Checks if the data meets the request completion criteria.
265 *
266 * @param request
267 * @param data
268 * @return
269 */
2fb2eb37 270 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 271 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
272 }
273
951d134a
FC
274 // ------------------------------------------------------------------------
275 // Signal handlers
276 // ------------------------------------------------------------------------
277
8c8bf09f
ASL
278 @TmfSignalHandler
279 public void startSynch(TmfStartSynchSignal signal) {
cbd4ad82 280 synchronized(this) {
36548af3 281 fCoalescingLevel++;
951d134a 282 }
8c8bf09f
ASL
283 }
284
285 @TmfSignalHandler
286 public void endSynch(TmfEndSynchSignal signal) {
cbd4ad82 287 synchronized(this) {
36548af3
FC
288 fCoalescingLevel--;
289 if (fCoalescingLevel == 0) {
951d134a
FC
290 fireRequests();
291 }
292 }
8c8bf09f
ASL
293 }
294
295}
This page took 0.0396 seconds and 5 git commands to generate.