[Bug309042] Improved test code coverage and other mundane issues.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
951d134a 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
8c8bf09f 21import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
22import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
24import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
26import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
27import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
28import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
29import org.eclipse.linuxtools.tmf.trace.ITmfContext;
30
31/**
32 * <b><u>TmfProvider</u></b>
33 * <p>
34 * The TmfProvider<T> is a provider for a data of type <T>.
35 * <p>
36 * This abstract class implements the housekeeking methods to register/
37 * deregister the event provider and to handle generically the event requests.
38 * <p>
39 * The concrete class can either re-implement processRequest() entirely or
40 * just implement the hooks (initializeContext() and getNext()).
41 * <p>
42 * TODO: Add support for providing multiple data types.
43 */
951d134a 44public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f
ASL
45
46 final protected Class<T> fType;
47
48 public static final int DEFAULT_QUEUE_SIZE = 1000;
49 protected final int fQueueSize;
50 protected final BlockingQueue<T> fDataQueue;
51 protected final TmfRequestExecutor fExecutor;
52
951d134a
FC
53 private Integer fSynchDepth;
54
8c8bf09f 55 // ------------------------------------------------------------------------
951d134a 56 // Constructors
8c8bf09f
ASL
57 // ------------------------------------------------------------------------
58
59 public TmfDataProvider(String name, Class<T> type) {
60 this(name, type, DEFAULT_QUEUE_SIZE);
61 }
62
63 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
64 super(name);
8c8bf09f 65 fQueueSize = queueSize;
fc6ccf6f 66 fType = type;
951d134a
FC
67 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
68
fc6ccf6f 69 fExecutor = new TmfRequestExecutor();
951d134a 70 fSynchDepth = 0;
54d55ced
FC
71
72 TmfProviderManager.register(fType, this);
fc6ccf6f 73 }
8c8bf09f 74
fc6ccf6f
FC
75 @Override
76 public void register() {
77 super.register();
8c8bf09f 78 TmfProviderManager.register(fType, this);
8c8bf09f 79 }
fc6ccf6f 80
8c8bf09f 81 @Override
fc6ccf6f 82 public void deregister() {
8c8bf09f 83 TmfProviderManager.deregister(fType, this);
54d55ced 84 fExecutor.stop();
fc6ccf6f 85 super.deregister();
8c8bf09f
ASL
86 }
87
88 public int getQueueSize() {
89 return fQueueSize;
90 }
91
8c8bf09f
ASL
92 // ------------------------------------------------------------------------
93 // ITmfRequestHandler
94 // ------------------------------------------------------------------------
95
951d134a
FC
96 public void sendRequest(final TmfDataRequest<T> request) {
97
98 if (fSynchDepth > 0) {
99 // We are in coalescing mode: client should NEVER wait
100 // (otherwise we will have deadlock...)
101 coalesceDataRequest(request);
102 } else {
103 // Process the request immediately
104 queueRequest(request);
105 }
106 }
107
108 /**
109 * This method queues the coalesced requests.
110 *
111 * @param thread
112 */
113 private synchronized void fireRequests() {
114 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
115 queueRequest(request);
116 }
117 fPendingCoalescedRequests.clear();
118 }
119
120 // ------------------------------------------------------------------------
121 // Coalescing (primitive test...)
122 // ------------------------------------------------------------------------
8c8bf09f 123
951d134a 124 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 125
951d134a
FC
126 protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) {
127 TmfCoalescedDataRequest<T> coalescedRequest =
128 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
129 coalescedRequest.addRequest(request);
130 fPendingCoalescedRequests.add(coalescedRequest);
131 }
8c8bf09f 132
951d134a
FC
133 protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) {
134 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
135 if (req.isCompatible(request)) {
136 req.addRequest(request);
137 return;
138 }
8c8bf09f 139 }
951d134a 140 newCoalescedDataRequest(request);
8c8bf09f
ASL
141 }
142
951d134a
FC
143 // ------------------------------------------------------------------------
144 // Request processing
145 // ------------------------------------------------------------------------
146
147 protected void queueRequest(final TmfDataRequest<T> request) {
9aae0442 148
8c8bf09f
ASL
149 // Process the request
150 Thread thread = new Thread() {
151
152 @Override
153 public void run() {
154
155 // Extract the generic information
156 int blockSize = request.getBlockize();
157 int nbRequested = request.getNbRequested();
158
159 // Create the result buffer
160 Vector<T> result = new Vector<T>();
161 int nbRead = 0;
162
163 // Initialize the execution
164 ITmfContext context = armRequest(request);
165 if (context == null) {
fc6ccf6f 166 request.fail();
8c8bf09f
ASL
167 return;
168 }
169
fc6ccf6f
FC
170 // Get the ordered events
171 T data = getNext(context);
172 while (data != null && !isCompleted(request, data, nbRead))
173 {
174 result.add(data);
175 if (++nbRead % blockSize == 0) {
176 pushData(request, result);
8c8bf09f 177 }
fc6ccf6f 178 // To avoid an unnecessary read passed the last data requested
7f407ead 179 if (nbRead < nbRequested) {
fc6ccf6f 180 data = getNext(context);
7f407ead 181 }
8c8bf09f 182 }
fc6ccf6f
FC
183 pushData(request, result);
184 request.done();
8c8bf09f
ASL
185 }
186 };
fc6ccf6f 187 fExecutor.queueRequest(thread);
8c8bf09f
ASL
188 }
189
190 /**
191 * Format the result data and forwards it to the requester.
192 * Note: after handling, the data is *removed*.
193 *
194 * @param request
195 * @param data
196 */
197 @SuppressWarnings("unchecked")
951d134a 198 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
199 synchronized(request) {
200 if (!request.isCompleted()) {
201 T[] result = (T[]) Array.newInstance(fType, data.size());
202 data.toArray(result);
203 request.setData(result);
204 request.handleData();
205 data.removeAllElements();
206 }
207 }
208 }
209
951d134a
FC
210 /**
211 * Initialize the provider based on the request. The context is
212 * provider specific and will be updated by getNext().
213 *
214 * @param request
215 * @return an application specific context; null if request can't be serviced
216 */
217 public abstract ITmfContext armRequest(TmfDataRequest<T> request);
218
8c8bf09f
ASL
219 /**
220 * Return the next piece of data based on the context supplied. The context
221 * would typically be updated for the subsequent read.
222 *
223 * @param context
224 * @return
225 */
fc6ccf6f
FC
226 public T getNext(ITmfContext context) {
227 try {
228 T event = fDataQueue.take();
229 return event;
230 } catch (InterruptedException e) {
231 e.printStackTrace();
8c8bf09f 232 }
fc6ccf6f 233 return null;
8c8bf09f
ASL
234 }
235
951d134a
FC
236 /**
237 * Makes the generated result data available for getNext()
238 *
239 * @param data
240 */
fc6ccf6f
FC
241 public void queueResult(T data) {
242 try {
243 fDataQueue.put(data);
244 } catch (InterruptedException e1) {
245 e1.printStackTrace();
8c8bf09f
ASL
246 }
247 }
248
249 /**
250 * Checks if the data meets the request completion criteria.
251 *
252 * @param request
253 * @param data
254 * @return
255 */
fc6ccf6f
FC
256 public boolean isCompleted(TmfDataRequest<T> request, T data, int nbRead) {
257 return request.isCompleted() || nbRead >= request.getNbRequested();
8c8bf09f
ASL
258 }
259
951d134a
FC
260 // ------------------------------------------------------------------------
261 // Signal handlers
262 // ------------------------------------------------------------------------
263
8c8bf09f
ASL
264 @TmfSignalHandler
265 public void startSynch(TmfStartSynchSignal signal) {
cbd4ad82 266 synchronized(this) {
951d134a
FC
267 fSynchDepth++;
268 }
8c8bf09f
ASL
269 }
270
271 @TmfSignalHandler
272 public void endSynch(TmfEndSynchSignal signal) {
cbd4ad82 273 synchronized(this) {
951d134a
FC
274 fSynchDepth--;
275 if (fSynchDepth == 0) {
276 fireRequests();
277 }
278 }
8c8bf09f
ASL
279 }
280
281}
This page took 0.038282 seconds and 5 git commands to generate.