[Bug292967] Second part of request coalescing + unit tests + minor fixes.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
15import java.lang.reflect.Array;
16import java.util.Vector;
17import java.util.concurrent.BlockingQueue;
18import java.util.concurrent.LinkedBlockingQueue;
951d134a 19import java.util.concurrent.SynchronousQueue;
8c8bf09f 20
8c8bf09f 21import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a
FC
22import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
24import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
25import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
26import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
27import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
28import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
29import org.eclipse.linuxtools.tmf.trace.ITmfContext;
30
31/**
32 * <b><u>TmfProvider</u></b>
33 * <p>
34 * The TmfProvider<T> is a provider for a data of type <T>.
35 * <p>
36 * This abstract class implements the housekeeking methods to register/
37 * deregister the event provider and to handle generically the event requests.
38 * <p>
39 * The concrete class can either re-implement processRequest() entirely or
40 * just implement the hooks (initializeContext() and getNext()).
41 * <p>
42 * TODO: Add support for providing multiple data types.
43 */
951d134a 44public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f
ASL
45
46 final protected Class<T> fType;
47
48 public static final int DEFAULT_QUEUE_SIZE = 1000;
49 protected final int fQueueSize;
50 protected final BlockingQueue<T> fDataQueue;
51 protected final TmfRequestExecutor fExecutor;
52
951d134a
FC
53 private Integer fSynchDepth;
54
8c8bf09f 55 // ------------------------------------------------------------------------
951d134a 56 // Constructors
8c8bf09f
ASL
57 // ------------------------------------------------------------------------
58
59 public TmfDataProvider(String name, Class<T> type) {
60 this(name, type, DEFAULT_QUEUE_SIZE);
61 }
62
63 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
64 super(name);
8c8bf09f 65 fQueueSize = queueSize;
fc6ccf6f 66 fType = type;
951d134a
FC
67 fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
68
fc6ccf6f 69 fExecutor = new TmfRequestExecutor();
951d134a 70 fSynchDepth = 0;
fc6ccf6f
FC
71 register();
72 }
8c8bf09f 73
fc6ccf6f
FC
74 @Override
75 public void register() {
76 super.register();
8c8bf09f 77 TmfProviderManager.register(fType, this);
8c8bf09f 78 }
fc6ccf6f 79
8c8bf09f 80 @Override
fc6ccf6f 81 public void deregister() {
8c8bf09f 82 TmfProviderManager.deregister(fType, this);
fc6ccf6f 83 super.deregister();
8c8bf09f
ASL
84 }
85
86 public int getQueueSize() {
87 return fQueueSize;
88 }
89
8c8bf09f
ASL
90 // ------------------------------------------------------------------------
91 // ITmfRequestHandler
92 // ------------------------------------------------------------------------
93
951d134a
FC
94 public void sendRequest(final TmfDataRequest<T> request) {
95
96 if (fSynchDepth > 0) {
97 // We are in coalescing mode: client should NEVER wait
98 // (otherwise we will have deadlock...)
99 coalesceDataRequest(request);
100 } else {
101 // Process the request immediately
102 queueRequest(request);
103 }
104 }
105
106 /**
107 * This method queues the coalesced requests.
108 *
109 * @param thread
110 */
111 private synchronized void fireRequests() {
112 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
113 queueRequest(request);
114 }
115 fPendingCoalescedRequests.clear();
116 }
117
118 // ------------------------------------------------------------------------
119 // Coalescing (primitive test...)
120 // ------------------------------------------------------------------------
8c8bf09f 121
951d134a 122 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 123
951d134a
FC
124 protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) {
125 TmfCoalescedDataRequest<T> coalescedRequest =
126 new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize());
127 coalescedRequest.addRequest(request);
128 fPendingCoalescedRequests.add(coalescedRequest);
129 }
8c8bf09f 130
951d134a
FC
131 protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) {
132 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
133 if (req.isCompatible(request)) {
134 req.addRequest(request);
135 return;
136 }
8c8bf09f 137 }
951d134a 138 newCoalescedDataRequest(request);
8c8bf09f
ASL
139 }
140
951d134a
FC
141 // ------------------------------------------------------------------------
142 // Request processing
143 // ------------------------------------------------------------------------
144
145 protected void queueRequest(final TmfDataRequest<T> request) {
9aae0442 146
8c8bf09f
ASL
147 // Process the request
148 Thread thread = new Thread() {
149
150 @Override
151 public void run() {
152
153 // Extract the generic information
154 int blockSize = request.getBlockize();
155 int nbRequested = request.getNbRequested();
156
157 // Create the result buffer
158 Vector<T> result = new Vector<T>();
159 int nbRead = 0;
160
161 // Initialize the execution
162 ITmfContext context = armRequest(request);
163 if (context == null) {
fc6ccf6f 164 request.fail();
8c8bf09f
ASL
165 return;
166 }
167
fc6ccf6f
FC
168 // Get the ordered events
169 T data = getNext(context);
170 while (data != null && !isCompleted(request, data, nbRead))
171 {
172 result.add(data);
173 if (++nbRead % blockSize == 0) {
174 pushData(request, result);
8c8bf09f 175 }
fc6ccf6f
FC
176 // To avoid an unnecessary read passed the last data requested
177 if (nbRead < nbRequested)
178 data = getNext(context);
8c8bf09f 179 }
fc6ccf6f
FC
180 pushData(request, result);
181 request.done();
8c8bf09f
ASL
182 }
183 };
fc6ccf6f 184 fExecutor.queueRequest(thread);
8c8bf09f
ASL
185 }
186
187 /**
188 * Format the result data and forwards it to the requester.
189 * Note: after handling, the data is *removed*.
190 *
191 * @param request
192 * @param data
193 */
194 @SuppressWarnings("unchecked")
951d134a 195 protected void pushData(ITmfDataRequest<T> request, Vector<T> data) {
8c8bf09f
ASL
196 synchronized(request) {
197 if (!request.isCompleted()) {
198 T[] result = (T[]) Array.newInstance(fType, data.size());
199 data.toArray(result);
200 request.setData(result);
201 request.handleData();
202 data.removeAllElements();
203 }
204 }
205 }
206
951d134a
FC
207 /**
208 * Initialize the provider based on the request. The context is
209 * provider specific and will be updated by getNext().
210 *
211 * @param request
212 * @return an application specific context; null if request can't be serviced
213 */
214 public abstract ITmfContext armRequest(TmfDataRequest<T> request);
215
8c8bf09f
ASL
216 /**
217 * Return the next piece of data based on the context supplied. The context
218 * would typically be updated for the subsequent read.
219 *
220 * @param context
221 * @return
222 */
fc6ccf6f
FC
223 public T getNext(ITmfContext context) {
224 try {
225 T event = fDataQueue.take();
226 return event;
227 } catch (InterruptedException e) {
228 e.printStackTrace();
8c8bf09f 229 }
fc6ccf6f 230 return null;
8c8bf09f
ASL
231 }
232
951d134a
FC
233 /**
234 * Makes the generated result data available for getNext()
235 *
236 * @param data
237 */
fc6ccf6f
FC
238 public void queueResult(T data) {
239 try {
240 fDataQueue.put(data);
241 } catch (InterruptedException e1) {
242 e1.printStackTrace();
8c8bf09f
ASL
243 }
244 }
245
246 /**
247 * Checks if the data meets the request completion criteria.
248 *
249 * @param request
250 * @param data
251 * @return
252 */
fc6ccf6f
FC
253 public boolean isCompleted(TmfDataRequest<T> request, T data, int nbRead) {
254 return request.isCompleted() || nbRead >= request.getNbRequested();
8c8bf09f
ASL
255 }
256
951d134a
FC
257 // ------------------------------------------------------------------------
258 // Signal handlers
259 // ------------------------------------------------------------------------
260
8c8bf09f
ASL
261 @TmfSignalHandler
262 public void startSynch(TmfStartSynchSignal signal) {
951d134a
FC
263 synchronized(fSynchDepth) {
264 fSynchDepth++;
265 }
8c8bf09f
ASL
266 }
267
268 @TmfSignalHandler
269 public void endSynch(TmfEndSynchSignal signal) {
951d134a
FC
270 synchronized(fSynchDepth) {
271 fSynchDepth--;
272 if (fSynchDepth == 0) {
273 fireRequests();
274 }
275 }
8c8bf09f
ASL
276 }
277
278}
This page took 0.039233 seconds and 5 git commands to generate.