Commit | Line | Data |
---|---|---|
8c8bf09f ASL |
1 | /******************************************************************************* |
2 | * Copyright (c) 2009, 2010 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Francois Chouinard - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.component; | |
14 | ||
15 | import java.lang.reflect.Array; | |
16 | import java.util.Vector; | |
17 | import java.util.concurrent.BlockingQueue; | |
18 | import java.util.concurrent.LinkedBlockingQueue; | |
951d134a | 19 | import java.util.concurrent.SynchronousQueue; |
8c8bf09f | 20 | |
8c8bf09f | 21 | import org.eclipse.linuxtools.tmf.event.TmfData; |
951d134a FC |
22 | import org.eclipse.linuxtools.tmf.request.ITmfDataRequest; |
23 | import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest; | |
8c8bf09f ASL |
24 | import org.eclipse.linuxtools.tmf.request.TmfDataRequest; |
25 | import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor; | |
26 | import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal; | |
27 | import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler; | |
28 | import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal; | |
29 | import org.eclipse.linuxtools.tmf.trace.ITmfContext; | |
30 | ||
31 | /** | |
32 | * <b><u>TmfProvider</u></b> | |
33 | * <p> | |
34 | * The TmfProvider<T> is a provider for a data of type <T>. | |
35 | * <p> | |
36 | * This abstract class implements the housekeeking methods to register/ | |
37 | * deregister the event provider and to handle generically the event requests. | |
38 | * <p> | |
39 | * The concrete class can either re-implement processRequest() entirely or | |
40 | * just implement the hooks (initializeContext() and getNext()). | |
41 | * <p> | |
42 | * TODO: Add support for providing multiple data types. | |
43 | */ | |
951d134a | 44 | public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> { |
8c8bf09f ASL |
45 | |
46 | final protected Class<T> fType; | |
47 | ||
48 | public static final int DEFAULT_QUEUE_SIZE = 1000; | |
49 | protected final int fQueueSize; | |
50 | protected final BlockingQueue<T> fDataQueue; | |
51 | protected final TmfRequestExecutor fExecutor; | |
52 | ||
951d134a FC |
53 | private Integer fSynchDepth; |
54 | ||
8c8bf09f | 55 | // ------------------------------------------------------------------------ |
951d134a | 56 | // Constructors |
8c8bf09f ASL |
57 | // ------------------------------------------------------------------------ |
58 | ||
59 | public TmfDataProvider(String name, Class<T> type) { | |
60 | this(name, type, DEFAULT_QUEUE_SIZE); | |
61 | } | |
62 | ||
63 | protected TmfDataProvider(String name, Class<T> type, int queueSize) { | |
64 | super(name); | |
8c8bf09f | 65 | fQueueSize = queueSize; |
fc6ccf6f | 66 | fType = type; |
951d134a FC |
67 | fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>(); |
68 | ||
fc6ccf6f | 69 | fExecutor = new TmfRequestExecutor(); |
951d134a | 70 | fSynchDepth = 0; |
54d55ced FC |
71 | |
72 | TmfProviderManager.register(fType, this); | |
fc6ccf6f | 73 | } |
8c8bf09f | 74 | |
fc6ccf6f FC |
75 | @Override |
76 | public void register() { | |
77 | super.register(); | |
8c8bf09f | 78 | TmfProviderManager.register(fType, this); |
8c8bf09f | 79 | } |
fc6ccf6f | 80 | |
8c8bf09f | 81 | @Override |
fc6ccf6f | 82 | public void deregister() { |
8c8bf09f | 83 | TmfProviderManager.deregister(fType, this); |
54d55ced | 84 | fExecutor.stop(); |
fc6ccf6f | 85 | super.deregister(); |
8c8bf09f ASL |
86 | } |
87 | ||
88 | public int getQueueSize() { | |
89 | return fQueueSize; | |
90 | } | |
91 | ||
8c8bf09f ASL |
92 | // ------------------------------------------------------------------------ |
93 | // ITmfRequestHandler | |
94 | // ------------------------------------------------------------------------ | |
95 | ||
951d134a FC |
96 | public void sendRequest(final TmfDataRequest<T> request) { |
97 | ||
98 | if (fSynchDepth > 0) { | |
99 | // We are in coalescing mode: client should NEVER wait | |
100 | // (otherwise we will have deadlock...) | |
101 | coalesceDataRequest(request); | |
102 | } else { | |
103 | // Process the request immediately | |
104 | queueRequest(request); | |
105 | } | |
106 | } | |
107 | ||
108 | /** | |
109 | * This method queues the coalesced requests. | |
110 | * | |
111 | * @param thread | |
112 | */ | |
113 | private synchronized void fireRequests() { | |
114 | for (TmfDataRequest<T> request : fPendingCoalescedRequests) { | |
115 | queueRequest(request); | |
116 | } | |
117 | fPendingCoalescedRequests.clear(); | |
118 | } | |
119 | ||
120 | // ------------------------------------------------------------------------ | |
121 | // Coalescing (primitive test...) | |
122 | // ------------------------------------------------------------------------ | |
8c8bf09f | 123 | |
951d134a | 124 | protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>(); |
8c8bf09f | 125 | |
951d134a FC |
126 | protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) { |
127 | TmfCoalescedDataRequest<T> coalescedRequest = | |
128 | new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize()); | |
129 | coalescedRequest.addRequest(request); | |
130 | fPendingCoalescedRequests.add(coalescedRequest); | |
131 | } | |
8c8bf09f | 132 | |
951d134a FC |
133 | protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) { |
134 | for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) { | |
135 | if (req.isCompatible(request)) { | |
136 | req.addRequest(request); | |
137 | return; | |
138 | } | |
8c8bf09f | 139 | } |
951d134a | 140 | newCoalescedDataRequest(request); |
8c8bf09f ASL |
141 | } |
142 | ||
951d134a FC |
143 | // ------------------------------------------------------------------------ |
144 | // Request processing | |
145 | // ------------------------------------------------------------------------ | |
146 | ||
147 | protected void queueRequest(final TmfDataRequest<T> request) { | |
9aae0442 | 148 | |
8c8bf09f ASL |
149 | // Process the request |
150 | Thread thread = new Thread() { | |
151 | ||
152 | @Override | |
153 | public void run() { | |
154 | ||
155 | // Extract the generic information | |
156 | int blockSize = request.getBlockize(); | |
157 | int nbRequested = request.getNbRequested(); | |
158 | ||
159 | // Create the result buffer | |
160 | Vector<T> result = new Vector<T>(); | |
161 | int nbRead = 0; | |
162 | ||
163 | // Initialize the execution | |
164 | ITmfContext context = armRequest(request); | |
165 | if (context == null) { | |
fc6ccf6f | 166 | request.fail(); |
8c8bf09f ASL |
167 | return; |
168 | } | |
169 | ||
fc6ccf6f FC |
170 | // Get the ordered events |
171 | T data = getNext(context); | |
172 | while (data != null && !isCompleted(request, data, nbRead)) | |
173 | { | |
174 | result.add(data); | |
175 | if (++nbRead % blockSize == 0) { | |
176 | pushData(request, result); | |
8c8bf09f | 177 | } |
fc6ccf6f | 178 | // To avoid an unnecessary read passed the last data requested |
7f407ead | 179 | if (nbRead < nbRequested) { |
fc6ccf6f | 180 | data = getNext(context); |
7f407ead | 181 | } |
8c8bf09f | 182 | } |
fc6ccf6f FC |
183 | pushData(request, result); |
184 | request.done(); | |
8c8bf09f ASL |
185 | } |
186 | }; | |
fc6ccf6f | 187 | fExecutor.queueRequest(thread); |
8c8bf09f ASL |
188 | } |
189 | ||
190 | /** | |
191 | * Format the result data and forwards it to the requester. | |
192 | * Note: after handling, the data is *removed*. | |
193 | * | |
194 | * @param request | |
195 | * @param data | |
196 | */ | |
197 | @SuppressWarnings("unchecked") | |
951d134a | 198 | protected void pushData(ITmfDataRequest<T> request, Vector<T> data) { |
8c8bf09f ASL |
199 | synchronized(request) { |
200 | if (!request.isCompleted()) { | |
201 | T[] result = (T[]) Array.newInstance(fType, data.size()); | |
202 | data.toArray(result); | |
203 | request.setData(result); | |
204 | request.handleData(); | |
205 | data.removeAllElements(); | |
206 | } | |
207 | } | |
208 | } | |
209 | ||
951d134a FC |
210 | /** |
211 | * Initialize the provider based on the request. The context is | |
212 | * provider specific and will be updated by getNext(). | |
213 | * | |
214 | * @param request | |
215 | * @return an application specific context; null if request can't be serviced | |
216 | */ | |
217 | public abstract ITmfContext armRequest(TmfDataRequest<T> request); | |
218 | ||
8c8bf09f ASL |
219 | /** |
220 | * Return the next piece of data based on the context supplied. The context | |
221 | * would typically be updated for the subsequent read. | |
222 | * | |
223 | * @param context | |
224 | * @return | |
225 | */ | |
fc6ccf6f FC |
226 | public T getNext(ITmfContext context) { |
227 | try { | |
228 | T event = fDataQueue.take(); | |
229 | return event; | |
230 | } catch (InterruptedException e) { | |
231 | e.printStackTrace(); | |
8c8bf09f | 232 | } |
fc6ccf6f | 233 | return null; |
8c8bf09f ASL |
234 | } |
235 | ||
951d134a FC |
236 | /** |
237 | * Makes the generated result data available for getNext() | |
238 | * | |
239 | * @param data | |
240 | */ | |
fc6ccf6f FC |
241 | public void queueResult(T data) { |
242 | try { | |
243 | fDataQueue.put(data); | |
244 | } catch (InterruptedException e1) { | |
245 | e1.printStackTrace(); | |
8c8bf09f ASL |
246 | } |
247 | } | |
248 | ||
249 | /** | |
250 | * Checks if the data meets the request completion criteria. | |
251 | * | |
252 | * @param request | |
253 | * @param data | |
254 | * @return | |
255 | */ | |
fc6ccf6f FC |
256 | public boolean isCompleted(TmfDataRequest<T> request, T data, int nbRead) { |
257 | return request.isCompleted() || nbRead >= request.getNbRequested(); | |
8c8bf09f ASL |
258 | } |
259 | ||
951d134a FC |
260 | // ------------------------------------------------------------------------ |
261 | // Signal handlers | |
262 | // ------------------------------------------------------------------------ | |
263 | ||
8c8bf09f ASL |
264 | @TmfSignalHandler |
265 | public void startSynch(TmfStartSynchSignal signal) { | |
cbd4ad82 | 266 | synchronized(this) { |
951d134a FC |
267 | fSynchDepth++; |
268 | } | |
8c8bf09f ASL |
269 | } |
270 | ||
271 | @TmfSignalHandler | |
272 | public void endSynch(TmfEndSynchSignal signal) { | |
cbd4ad82 | 273 | synchronized(this) { |
951d134a FC |
274 | fSynchDepth--; |
275 | if (fSynchDepth == 0) { | |
276 | fireRequests(); | |
277 | } | |
278 | } | |
8c8bf09f ASL |
279 | } |
280 | ||
281 | } |