(no commit message)
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
19import java.util.concurrent.SynchronousQueue;
20
21import org.eclipse.linuxtools.tmf.Tracer;
22import org.eclipse.linuxtools.tmf.event.TmfData;
23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
24import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 final protected Class<T> fType;
48
49 public static final int DEFAULT_QUEUE_SIZE = 1000;
50 protected final int fQueueSize;
51 protected final BlockingQueue<T> fDataQueue;
52 protected final TmfRequestExecutor fExecutor;
53
54 private int fCoalescingLevel = 0;
55
56 // ------------------------------------------------------------------------
57 // Constructors
58 // ------------------------------------------------------------------------
59
60 public TmfDataProvider(String name, Class<T> type) {
61 this(name, type, DEFAULT_QUEUE_SIZE);
62 }
63
64 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
65 super(name);
66 fType = type;
67 fQueueSize = queueSize;
68 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
69
70 Tracer.trace(getName() + " created");
71
72 fExecutor = new TmfRequestExecutor();
73 fCoalescingLevel = 0;
74
75 TmfProviderManager.register(fType, this);
76 Tracer.trace(getName() + " started");
77}
78
79 public TmfDataProvider(TmfDataProvider<T> other) {
80 super(other);
81 fType = other.fType;
82 fQueueSize = other.fQueueSize;
83 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
84
85 fExecutor = new TmfRequestExecutor();
86 fCoalescingLevel = 0;
87 }
88
89 @Override
90 public void dispose() {
91 TmfProviderManager.deregister(fType, this);
92 fExecutor.stop();
93 Tracer.trace(getName() + " stopped");
94 super.dispose();
95 }
96
97 public int getQueueSize() {
98 return fQueueSize;
99 }
100
101 public Class<?> getType() {
102 return fType;
103 }
104
105 // ------------------------------------------------------------------------
106 // ITmfRequestHandler
107 // ------------------------------------------------------------------------
108
109 public synchronized void sendRequest(final ITmfDataRequest<T> request) {
110
111 if (fCoalescingLevel > 0) {
112 // We are in coalescing mode: client should NEVER wait
113 // (otherwise we will have deadlock...)
114 coalesceDataRequest(request);
115 } else {
116 // Process the request immediately
117 queueRequest(request);
118 }
119 }
120
121 /**
122 * This method queues the coalesced requests.
123 *
124 * @param thread
125 */
126 private synchronized void fireRequests() {
127 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
128 queueRequest(request);
129 }
130 fPendingCoalescedRequests.clear();
131 }
132
133 // ------------------------------------------------------------------------
134 // Coalescing (primitive test...)
135 // ------------------------------------------------------------------------
136
137 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
138
139 protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
140 TmfCoalescedDataRequest<T> coalescedRequest =
141 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
142 coalescedRequest.addRequest(request);
143 fPendingCoalescedRequests.add(coalescedRequest);
144 }
145
146 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
147 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
148 if (req.isCompatible(request)) {
149 req.addRequest(request);
150 return;
151 }
152 }
153 newCoalescedDataRequest(request);
154 }
155
156 // ------------------------------------------------------------------------
157 // Request processing
158 // ------------------------------------------------------------------------
159
160 protected void queueRequest(final ITmfDataRequest<T> request) {
161
162 final String provider = getName();
163
164 // Process the request
165 Thread thread = new Thread() {
166
167 @Override
168 public void run() {
169
170 // Extract the generic information
171 int blockSize = request.getBlockize();
172 int nbRequested = request.getNbRequested();
173
174 // Create the result buffer
175 Vector<T> result = new Vector<T>();
176 int nbRead = 0;
177
178 // Initialize the execution
179 ITmfContext context = armRequest(request);
180 if (context == null) {
181 request.fail();
182 return;
183 }
184
185 // Get the ordered events
186 Tracer.trace("Request #" + request.getRequestId() + " is serviced by " + provider);
187 T data = getNext(context);
188 Tracer.trace("Request #" + request.getRequestId() + " read first event");
189 while (data != null && !isCompleted(request, data, nbRead))
190 {
191 result.add(data);
192 if (++nbRead % blockSize == 0) {
193 pushData(request, result);
194 }
195 // To avoid an unnecessary read passed the last data requested
196 if (nbRead < nbRequested) {
197 data = getNext(context);
198 if (data == null || data.isNullRef()) {
199 Tracer.trace("Request #" + request.getRequestId() + " end of data");
200 }
201 }
202 }
203 pushData(request, result);
204 request.done();
205 }
206 };
207 fExecutor.execute(thread);
208 }
209
210 /**
211 * Format the result data and forwards it to the requester.
212 * Note: after handling, the data is *removed*.
213 *
214 * @param request
215 * @param data
216 */
217 @SuppressWarnings("unchecked")
218 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
219 synchronized(request) {
220 if (!request.isCompleted()) {
221 T[] result = (T[]) Array.newInstance(fType, data.size());
222 data.toArray(result);
223 request.setData(result);
224 request.handleData();
225 data.removeAllElements();
226 }
227 }
228 }
229
230 /**
231 * Initialize the provider based on the request. The context is
232 * provider specific and will be updated by getNext().
233 *
234 * @param request
235 * @return an application specific context; null if request can't be serviced
236 */
237 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
238
239 /**
240 * Return the next piece of data based on the context supplied. The context
241 * would typically be updated for the subsequent read.
242 *
243 * @param context
244 * @return
245 */
246 public T getNext(ITmfContext context) {
247 try {
248 T event = fDataQueue.take();
249 return event;
250 } catch (InterruptedException e) {
251 e.printStackTrace();
252 }
253 return null;
254 }
255
256 /**
257 * Makes the generated result data available for getNext()
258 *
259 * @param data
260 */
261 public void queueResult(T data) {
262 try {
263 fDataQueue.put(data);
264 } catch (InterruptedException e1) {
265 e1.printStackTrace();
266 }
267 }
268
269 /**
270 * Checks if the data meets the request completion criteria.
271 *
272 * @param request
273 * @param data
274 * @return
275 */
276 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
277 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
278 }
279
280 // ------------------------------------------------------------------------
281 // Signal handlers
282 // ------------------------------------------------------------------------
283
284 @TmfSignalHandler
285 public void startSynch(TmfStartSynchSignal signal) {
286 synchronized(this) {
287 fCoalescingLevel++;
288 }
289 }
290
291 @TmfSignalHandler
292 public void endSynch(TmfEndSynchSignal signal) {
293 synchronized(this) {
294 fCoalescingLevel--;
295 if (fCoalescingLevel == 0) {
296 fireRequests();
297 }
298 }
299 }
300
301}
This page took 0.035292 seconds and 5 git commands to generate.