Commit | Line | Data |
---|---|---|
8c8bf09f ASL |
1 | /******************************************************************************* |
2 | * Copyright (c) 2009, 2010 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Francois Chouinard - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.linuxtools.tmf.component; | |
14 | ||
15 | import java.lang.reflect.Array; | |
16 | import java.util.Vector; | |
17 | import java.util.concurrent.BlockingQueue; | |
18 | import java.util.concurrent.LinkedBlockingQueue; | |
951d134a | 19 | import java.util.concurrent.SynchronousQueue; |
8c8bf09f | 20 | |
8c8bf09f | 21 | import org.eclipse.linuxtools.tmf.event.TmfData; |
951d134a FC |
22 | import org.eclipse.linuxtools.tmf.request.ITmfDataRequest; |
23 | import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest; | |
8c8bf09f ASL |
24 | import org.eclipse.linuxtools.tmf.request.TmfDataRequest; |
25 | import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor; | |
26 | import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal; | |
27 | import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler; | |
28 | import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal; | |
29 | import org.eclipse.linuxtools.tmf.trace.ITmfContext; | |
30 | ||
31 | /** | |
32 | * <b><u>TmfProvider</u></b> | |
33 | * <p> | |
34 | * The TmfProvider<T> is a provider for a data of type <T>. | |
35 | * <p> | |
36 | * This abstract class implements the housekeeking methods to register/ | |
37 | * deregister the event provider and to handle generically the event requests. | |
38 | * <p> | |
39 | * The concrete class can either re-implement processRequest() entirely or | |
40 | * just implement the hooks (initializeContext() and getNext()). | |
41 | * <p> | |
42 | * TODO: Add support for providing multiple data types. | |
43 | */ | |
951d134a | 44 | public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> { |
8c8bf09f ASL |
45 | |
46 | final protected Class<T> fType; | |
47 | ||
48 | public static final int DEFAULT_QUEUE_SIZE = 1000; | |
49 | protected final int fQueueSize; | |
50 | protected final BlockingQueue<T> fDataQueue; | |
51 | protected final TmfRequestExecutor fExecutor; | |
52 | ||
951d134a FC |
53 | private Integer fSynchDepth; |
54 | ||
8c8bf09f | 55 | // ------------------------------------------------------------------------ |
951d134a | 56 | // Constructors |
8c8bf09f ASL |
57 | // ------------------------------------------------------------------------ |
58 | ||
59 | public TmfDataProvider(String name, Class<T> type) { | |
60 | this(name, type, DEFAULT_QUEUE_SIZE); | |
61 | } | |
62 | ||
63 | protected TmfDataProvider(String name, Class<T> type, int queueSize) { | |
64 | super(name); | |
8c8bf09f | 65 | fQueueSize = queueSize; |
fc6ccf6f | 66 | fType = type; |
951d134a FC |
67 | fDataQueue = (queueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>(); |
68 | ||
fc6ccf6f | 69 | fExecutor = new TmfRequestExecutor(); |
951d134a | 70 | fSynchDepth = 0; |
fc6ccf6f FC |
71 | register(); |
72 | } | |
8c8bf09f | 73 | |
fc6ccf6f FC |
74 | @Override |
75 | public void register() { | |
76 | super.register(); | |
8c8bf09f | 77 | TmfProviderManager.register(fType, this); |
8c8bf09f | 78 | } |
fc6ccf6f | 79 | |
8c8bf09f | 80 | @Override |
fc6ccf6f | 81 | public void deregister() { |
8c8bf09f | 82 | TmfProviderManager.deregister(fType, this); |
fc6ccf6f | 83 | super.deregister(); |
8c8bf09f ASL |
84 | } |
85 | ||
86 | public int getQueueSize() { | |
87 | return fQueueSize; | |
88 | } | |
89 | ||
8c8bf09f ASL |
90 | // ------------------------------------------------------------------------ |
91 | // ITmfRequestHandler | |
92 | // ------------------------------------------------------------------------ | |
93 | ||
951d134a FC |
94 | public void sendRequest(final TmfDataRequest<T> request) { |
95 | ||
96 | if (fSynchDepth > 0) { | |
97 | // We are in coalescing mode: client should NEVER wait | |
98 | // (otherwise we will have deadlock...) | |
99 | coalesceDataRequest(request); | |
100 | } else { | |
101 | // Process the request immediately | |
102 | queueRequest(request); | |
103 | } | |
104 | } | |
105 | ||
106 | /** | |
107 | * This method queues the coalesced requests. | |
108 | * | |
109 | * @param thread | |
110 | */ | |
111 | private synchronized void fireRequests() { | |
112 | for (TmfDataRequest<T> request : fPendingCoalescedRequests) { | |
113 | queueRequest(request); | |
114 | } | |
115 | fPendingCoalescedRequests.clear(); | |
116 | } | |
117 | ||
118 | // ------------------------------------------------------------------------ | |
119 | // Coalescing (primitive test...) | |
120 | // ------------------------------------------------------------------------ | |
8c8bf09f | 121 | |
951d134a | 122 | protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>(); |
8c8bf09f | 123 | |
951d134a FC |
124 | protected synchronized void newCoalescedDataRequest(TmfDataRequest<T> request) { |
125 | TmfCoalescedDataRequest<T> coalescedRequest = | |
126 | new TmfCoalescedDataRequest<T>(fType, request.getIndex(), request.getNbRequested(), request.getBlockize()); | |
127 | coalescedRequest.addRequest(request); | |
128 | fPendingCoalescedRequests.add(coalescedRequest); | |
129 | } | |
8c8bf09f | 130 | |
951d134a FC |
131 | protected synchronized void coalesceDataRequest(TmfDataRequest<T> request) { |
132 | for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) { | |
133 | if (req.isCompatible(request)) { | |
134 | req.addRequest(request); | |
135 | return; | |
136 | } | |
8c8bf09f | 137 | } |
951d134a | 138 | newCoalescedDataRequest(request); |
8c8bf09f ASL |
139 | } |
140 | ||
951d134a FC |
141 | // ------------------------------------------------------------------------ |
142 | // Request processing | |
143 | // ------------------------------------------------------------------------ | |
144 | ||
145 | protected void queueRequest(final TmfDataRequest<T> request) { | |
9aae0442 | 146 | |
8c8bf09f ASL |
147 | // Process the request |
148 | Thread thread = new Thread() { | |
149 | ||
150 | @Override | |
151 | public void run() { | |
152 | ||
153 | // Extract the generic information | |
154 | int blockSize = request.getBlockize(); | |
155 | int nbRequested = request.getNbRequested(); | |
156 | ||
157 | // Create the result buffer | |
158 | Vector<T> result = new Vector<T>(); | |
159 | int nbRead = 0; | |
160 | ||
161 | // Initialize the execution | |
162 | ITmfContext context = armRequest(request); | |
163 | if (context == null) { | |
fc6ccf6f | 164 | request.fail(); |
8c8bf09f ASL |
165 | return; |
166 | } | |
167 | ||
fc6ccf6f FC |
168 | // Get the ordered events |
169 | T data = getNext(context); | |
170 | while (data != null && !isCompleted(request, data, nbRead)) | |
171 | { | |
172 | result.add(data); | |
173 | if (++nbRead % blockSize == 0) { | |
174 | pushData(request, result); | |
8c8bf09f | 175 | } |
fc6ccf6f FC |
176 | // To avoid an unnecessary read passed the last data requested |
177 | if (nbRead < nbRequested) | |
178 | data = getNext(context); | |
8c8bf09f | 179 | } |
fc6ccf6f FC |
180 | pushData(request, result); |
181 | request.done(); | |
8c8bf09f ASL |
182 | } |
183 | }; | |
fc6ccf6f | 184 | fExecutor.queueRequest(thread); |
8c8bf09f ASL |
185 | } |
186 | ||
187 | /** | |
188 | * Format the result data and forwards it to the requester. | |
189 | * Note: after handling, the data is *removed*. | |
190 | * | |
191 | * @param request | |
192 | * @param data | |
193 | */ | |
194 | @SuppressWarnings("unchecked") | |
951d134a | 195 | protected void pushData(ITmfDataRequest<T> request, Vector<T> data) { |
8c8bf09f ASL |
196 | synchronized(request) { |
197 | if (!request.isCompleted()) { | |
198 | T[] result = (T[]) Array.newInstance(fType, data.size()); | |
199 | data.toArray(result); | |
200 | request.setData(result); | |
201 | request.handleData(); | |
202 | data.removeAllElements(); | |
203 | } | |
204 | } | |
205 | } | |
206 | ||
951d134a FC |
207 | /** |
208 | * Initialize the provider based on the request. The context is | |
209 | * provider specific and will be updated by getNext(). | |
210 | * | |
211 | * @param request | |
212 | * @return an application specific context; null if request can't be serviced | |
213 | */ | |
214 | public abstract ITmfContext armRequest(TmfDataRequest<T> request); | |
215 | ||
8c8bf09f ASL |
216 | /** |
217 | * Return the next piece of data based on the context supplied. The context | |
218 | * would typically be updated for the subsequent read. | |
219 | * | |
220 | * @param context | |
221 | * @return | |
222 | */ | |
fc6ccf6f FC |
223 | public T getNext(ITmfContext context) { |
224 | try { | |
225 | T event = fDataQueue.take(); | |
226 | return event; | |
227 | } catch (InterruptedException e) { | |
228 | e.printStackTrace(); | |
8c8bf09f | 229 | } |
fc6ccf6f | 230 | return null; |
8c8bf09f ASL |
231 | } |
232 | ||
951d134a FC |
233 | /** |
234 | * Makes the generated result data available for getNext() | |
235 | * | |
236 | * @param data | |
237 | */ | |
fc6ccf6f FC |
238 | public void queueResult(T data) { |
239 | try { | |
240 | fDataQueue.put(data); | |
241 | } catch (InterruptedException e1) { | |
242 | e1.printStackTrace(); | |
8c8bf09f ASL |
243 | } |
244 | } | |
245 | ||
246 | /** | |
247 | * Checks if the data meets the request completion criteria. | |
248 | * | |
249 | * @param request | |
250 | * @param data | |
251 | * @return | |
252 | */ | |
fc6ccf6f FC |
253 | public boolean isCompleted(TmfDataRequest<T> request, T data, int nbRead) { |
254 | return request.isCompleted() || nbRead >= request.getNbRequested(); | |
8c8bf09f ASL |
255 | } |
256 | ||
951d134a FC |
257 | // ------------------------------------------------------------------------ |
258 | // Signal handlers | |
259 | // ------------------------------------------------------------------------ | |
260 | ||
8c8bf09f ASL |
261 | @TmfSignalHandler |
262 | public void startSynch(TmfStartSynchSignal signal) { | |
951d134a FC |
263 | synchronized(fSynchDepth) { |
264 | fSynchDepth++; | |
265 | } | |
8c8bf09f ASL |
266 | } |
267 | ||
268 | @TmfSignalHandler | |
269 | public void endSynch(TmfEndSynchSignal signal) { | |
951d134a FC |
270 | synchronized(fSynchDepth) { |
271 | fSynchDepth--; | |
272 | if (fSynchDepth == 0) { | |
273 | fireRequests(); | |
274 | } | |
275 | } | |
8c8bf09f ASL |
276 | } |
277 | ||
278 | } |