June 1st
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
951d134a 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
ce785d7d 21import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 22import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
24import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
25import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
951d134a 45public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f
ASL
46
47 final protected Class<T> fType;
48
49 public static final int DEFAULT_QUEUE_SIZE = 1000;
50 protected final int fQueueSize;
51 protected final BlockingQueue<T> fDataQueue;
52 protected final TmfRequestExecutor fExecutor;
53
36548af3 54 private int fCoalescingLevel = 0;
951d134a 55
8c8bf09f 56 // ------------------------------------------------------------------------
951d134a 57 // Constructors
8c8bf09f
ASL
58 // ------------------------------------------------------------------------
59
60 public TmfDataProvider(String name, Class<T> type) {
61 this(name, type, DEFAULT_QUEUE_SIZE);
62 }
63
64 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
65 super(name);
fc6ccf6f 66 fType = type;
ce785d7d 67 fQueueSize = queueSize;
951d134a
FC
68 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
69
ce785d7d
FC
70 Tracer.trace(getName() + " created");
71
72 fExecutor = new TmfRequestExecutor();
36548af3 73 fCoalescingLevel = 0;
54d55ced
FC
74
75 TmfProviderManager.register(fType, this);
ce785d7d
FC
76 Tracer.trace(getName() + " started");
77}
377f1ad8 78
ce785d7d
FC
79 public TmfDataProvider(TmfDataProvider<T> other) {
80 super(other);
81 fType = other.fType;
82 fQueueSize = other.fQueueSize;
83 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
84
85 fExecutor = new TmfRequestExecutor();
86 fCoalescingLevel = 0;
377f1ad8
WB
87 }
88
fc6ccf6f 89 @Override
2fb2eb37 90 public void dispose() {
8c8bf09f 91 TmfProviderManager.deregister(fType, this);
54d55ced 92 fExecutor.stop();
ce785d7d 93 Tracer.trace(getName() + " stopped");
2fb2eb37 94 super.dispose();
8c8bf09f
ASL
95 }
96
97 public int getQueueSize() {
98 return fQueueSize;
99 }
100
ff4ed569
FC
101 public Class<?> getType() {
102 return fType;
103 }
104
8c8bf09f
ASL
105 // ------------------------------------------------------------------------
106 // ITmfRequestHandler
107 // ------------------------------------------------------------------------
108
2fb2eb37 109 public synchronized void sendRequest(final ITmfDataRequest<T> request) {
951d134a 110
36548af3 111 if (fCoalescingLevel > 0) {
951d134a
FC
112 // We are in coalescing mode: client should NEVER wait
113 // (otherwise we will have deadlock...)
114 coalesceDataRequest(request);
115 } else {
116 // Process the request immediately
117 queueRequest(request);
118 }
119 }
120
121 /**
122 * This method queues the coalesced requests.
123 *
124 * @param thread
125 */
126 private synchronized void fireRequests() {
127 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
128 queueRequest(request);
129 }
130 fPendingCoalescedRequests.clear();
131 }
132
133 // ------------------------------------------------------------------------
134 // Coalescing (primitive test...)
135 // ------------------------------------------------------------------------
8c8bf09f 136
951d134a 137 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 138
2fb2eb37 139 protected synchronized void newCoalescedDataRequest(ITmfDataRequest<T> request) {
951d134a
FC
140 TmfCoalescedDataRequest<T> coalescedRequest =
141 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
142 coalescedRequest.addRequest(request);
143 fPendingCoalescedRequests.add(coalescedRequest);
144 }
8c8bf09f 145
2fb2eb37 146 protected synchronized void coalesceDataRequest(ITmfDataRequest<T> request) {
951d134a
FC
147 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
148 if (req.isCompatible(request)) {
149 req.addRequest(request);
150 return;
151 }
8c8bf09f 152 }
951d134a 153 newCoalescedDataRequest(request);
8c8bf09f
ASL
154 }
155
951d134a
FC
156 // ------------------------------------------------------------------------
157 // Request processing
158 // ------------------------------------------------------------------------
159
2fb2eb37 160 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 161
ce785d7d
FC
162 final String provider = getName();
163
8c8bf09f
ASL
164 // Process the request
165 Thread thread = new Thread() {
166
167 @Override
168 public void run() {
169
170 // Extract the generic information
171 int blockSize = request.getBlockize();
172 int nbRequested = request.getNbRequested();
173
174 // Create the result buffer
175 Vector<T> result = new Vector<T>();
176 int nbRead = 0;
177
178 // Initialize the execution
179 ITmfContext context = armRequest(request);
180 if (context == null) {
fc6ccf6f 181 request.fail();
8c8bf09f
ASL
182 return;
183 }
184
fc6ccf6f 185 // Get the ordered events
ce785d7d 186 Tracer.trace("Request #" + request.getRequestId() + " is serviced by " + provider);
fc6ccf6f 187 T data = getNext(context);
ce785d7d 188 Tracer.trace("Request #" + request.getRequestId() + " read first event");
fc6ccf6f
FC
189 while (data != null && !isCompleted(request, data, nbRead))
190 {
191 result.add(data);
192 if (++nbRead % blockSize == 0) {
193 pushData(request, result);
8c8bf09f 194 }
fc6ccf6f 195 // To avoid an unnecessary read passed the last data requested
7f407ead 196 if (nbRead < nbRequested) {
fc6ccf6f 197 data = getNext(context);
ce785d7d
FC
198 if (data == null || data.isNullRef()) {
199 Tracer.trace("Request #" + request.getRequestId() + " end of data");
200 }
7f407ead 201 }
8c8bf09f 202 }
fc6ccf6f
FC
203 pushData(request, result);
204 request.done();
8c8bf09f
ASL
205 }
206 };
5c00c0b7 207 fExecutor.execute(thread);
8c8bf09f
ASL
208 }
209
210 /**
211 * Format the result data and forwards it to the requester.
212 * Note: after handling, the data is *removed*.
213 *
214 * @param request
215 * @param data
216 */
217 @SuppressWarnings("unchecked")
951d134a 218 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
219 synchronized(request) {
220 if (!request.isCompleted()) {
221 T[] result = (T[]) Array.newInstance(fType, data.size());
222 data.toArray(result);
223 request.setData(result);
224 request.handleData();
225 data.removeAllElements();
226 }
227 }
228 }
229
951d134a
FC
230 /**
231 * Initialize the provider based on the request. The context is
232 * provider specific and will be updated by getNext().
233 *
234 * @param request
235 * @return an application specific context; null if request can't be serviced
236 */
2fb2eb37 237 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
951d134a 238
8c8bf09f
ASL
239 /**
240 * Return the next piece of data based on the context supplied. The context
241 * would typically be updated for the subsequent read.
242 *
243 * @param context
244 * @return
245 */
fc6ccf6f
FC
246 public T getNext(ITmfContext context) {
247 try {
248 T event = fDataQueue.take();
249 return event;
250 } catch (InterruptedException e) {
251 e.printStackTrace();
8c8bf09f 252 }
fc6ccf6f 253 return null;
8c8bf09f
ASL
254 }
255
951d134a
FC
256 /**
257 * Makes the generated result data available for getNext()
258 *
259 * @param data
260 */
fc6ccf6f
FC
261 public void queueResult(T data) {
262 try {
263 fDataQueue.put(data);
264 } catch (InterruptedException e1) {
265 e1.printStackTrace();
8c8bf09f
ASL
266 }
267 }
268
269 /**
270 * Checks if the data meets the request completion criteria.
271 *
272 * @param request
273 * @param data
274 * @return
275 */
2fb2eb37 276 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 277 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
278 }
279
951d134a
FC
280 // ------------------------------------------------------------------------
281 // Signal handlers
282 // ------------------------------------------------------------------------
283
8c8bf09f
ASL
284 @TmfSignalHandler
285 public void startSynch(TmfStartSynchSignal signal) {
cbd4ad82 286 synchronized(this) {
36548af3 287 fCoalescingLevel++;
951d134a 288 }
8c8bf09f
ASL
289 }
290
291 @TmfSignalHandler
292 public void endSynch(TmfEndSynchSignal signal) {
cbd4ad82 293 synchronized(this) {
36548af3
FC
294 fCoalescingLevel--;
295 if (fCoalescingLevel == 0) {
951d134a
FC
296 fireRequests();
297 }
298 }
8c8bf09f
ASL
299 }
300
301}
This page took 0.040016 seconds and 5 git commands to generate.