2010-09-17 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug325662
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.linuxtools.tmf.component;
14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
ce785d7d 20import org.eclipse.linuxtools.tmf.Tracer;
8c8bf09f 21import org.eclipse.linuxtools.tmf.event.TmfData;
951d134a 22import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
9b635e61 23import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
951d134a 24import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
8c8bf09f
ASL
25import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32/**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
951d134a 45public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 46
550d787e
FC
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
9b635e61 51// private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
550d787e
FC
52// private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
8c8bf09f 58 final protected Class<T> fType;
550d787e 59 final protected boolean fLogData;
cb866e08 60 final protected boolean fLogError;
8c8bf09f 61
f6b14ce2 62 public static final int DEFAULT_BLOCK_SIZE = 5000;
8c8bf09f 63 public static final int DEFAULT_QUEUE_SIZE = 1000;
f6b14ce2 64
8c8bf09f
ASL
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
550d787e 69 private int fSignalDepth = 0;
045df77d 70 private final Object fLock = new Object();
951d134a 71
8c8bf09f 72 // ------------------------------------------------------------------------
951d134a 73 // Constructors
8c8bf09f
ASL
74 // ------------------------------------------------------------------------
75
76 public TmfDataProvider(String name, Class<T> type) {
77 this(name, type, DEFAULT_QUEUE_SIZE);
78 }
79
80 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
81 super(name);
fc6ccf6f 82 fType = type;
ce785d7d 83 fQueueSize = queueSize;
9b635e61 84 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
85
86 fExecutor = new TmfRequestExecutor();
550d787e
FC
87 fSignalDepth = 0;
88
cb866e08
FC
89 fLogData = Tracer.isEventTraced();
90 fLogError = Tracer.isErrorTraced();
54d55ced
FC
91
92 TmfProviderManager.register(fType, this);
9b635e61 93// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
ce785d7d 94}
377f1ad8 95
ce785d7d
FC
96 public TmfDataProvider(TmfDataProvider<T> other) {
97 super(other);
98 fType = other.fType;
99 fQueueSize = other.fQueueSize;
9b635e61 100 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
ce785d7d
FC
101
102 fExecutor = new TmfRequestExecutor();
550d787e
FC
103 fSignalDepth = 0;
104
cb866e08
FC
105 fLogData = Tracer.isEventTraced();
106 fLogError = Tracer.isErrorTraced();
377f1ad8
WB
107 }
108
fc6ccf6f 109 @Override
2fb2eb37 110 public void dispose() {
8c8bf09f 111 TmfProviderManager.deregister(fType, this);
54d55ced 112 fExecutor.stop();
2fb2eb37 113 super.dispose();
3d62f8b7 114// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
8c8bf09f
ASL
115 }
116
117 public int getQueueSize() {
118 return fQueueSize;
119 }
120
ff4ed569
FC
121 public Class<?> getType() {
122 return fType;
123 }
124
8c8bf09f
ASL
125 // ------------------------------------------------------------------------
126 // ITmfRequestHandler
127 // ------------------------------------------------------------------------
128
550d787e 129 public void sendRequest(final ITmfDataRequest<T> request) {
045df77d 130 synchronized(fLock) {
9b635e61
FC
131 if (fSignalDepth > 0) {
132 coalesceDataRequest(request);
133 } else {
134 dispatchRequest(request);
550d787e 135 }
951d134a
FC
136 }
137 }
138
139 /**
140 * This method queues the coalesced requests.
141 *
142 * @param thread
143 */
550d787e 144 public void fireRequests() {
045df77d 145 synchronized(fLock) {
550d787e 146 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
9b635e61 147 dispatchRequest(request);
550d787e
FC
148 }
149 fPendingCoalescedRequests.clear();
951d134a 150 }
951d134a
FC
151 }
152
153 // ------------------------------------------------------------------------
154 // Coalescing (primitive test...)
155 // ------------------------------------------------------------------------
8c8bf09f 156
951d134a 157 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
8c8bf09f 158
550d787e 159 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
045df77d 160 synchronized(fLock) {
f9673903
FC
161 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(
162 fType, request.getIndex(), request.getNbRequested(),request.getExecType());
550d787e 163 coalescedRequest.addRequest(request);
9b635e61
FC
164 if (Tracer.isRequestTraced()) {
165 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId());
166 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId());
167 }
550d787e
FC
168 fPendingCoalescedRequests.add(coalescedRequest);
169 }
951d134a 170 }
8c8bf09f 171
045df77d
FC
172 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
173 synchronized(fLock) {
550d787e
FC
174 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
175 if (req.isCompatible(request)) {
176 req.addRequest(request);
9b635e61
FC
177 if (Tracer.isRequestTraced()) {
178 Tracer.traceRequest(request, "coalesced with " + req.getRequestId());
179 Tracer.traceRequest(req, "added " + request.getRequestId());
180 }
550d787e
FC
181 return;
182 }
951d134a 183 }
550d787e 184 newCoalescedDataRequest(request);
8c8bf09f 185 }
8c8bf09f
ASL
186 }
187
951d134a
FC
188 // ------------------------------------------------------------------------
189 // Request processing
190 // ------------------------------------------------------------------------
191
9b635e61 192 private void dispatchRequest(final ITmfDataRequest<T> request) {
f6b14ce2 193 if (request.getExecType() == ExecutionType.FOREGROUND)
9b635e61
FC
194 queueRequest(request);
195 else
f6b14ce2 196 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
9b635e61
FC
197 }
198
2fb2eb37 199 protected void queueRequest(final ITmfDataRequest<T> request) {
9aae0442 200
045df77d
FC
201 if (fExecutor.isShutdown()) {
202 request.cancel();
203 return;
204 }
205
f9673903 206 final TmfDataProvider<T> provider = this;
550d787e 207
8c8bf09f 208 // Process the request
9b635e61 209 TmfThread thread = new TmfThread(request.getExecType()) {
8c8bf09f
ASL
210
211 @Override
212 public void run() {
213
f9673903 214 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
9b635e61 215
8c8bf09f 216 // Extract the generic information
550d787e 217 request.start();
8c8bf09f 218 int nbRequested = request.getNbRequested();
8c8bf09f
ASL
219 int nbRead = 0;
220
221 // Initialize the execution
222 ITmfContext context = armRequest(request);
223 if (context == null) {
550d787e 224 request.cancel();
8c8bf09f
ASL
225 return;
226 }
227
550d787e
FC
228 try {
229 // Get the ordered events
f9673903 230 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
550d787e 231 T data = getNext(context);
f9673903 232 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
550d787e
FC
233 while (data != null && !isCompleted(request, data, nbRead))
234 {
f9673903
FC
235 if (fLogData) Tracer.traceEvent(provider, request, data);
236 request.handleData(data);
237
550d787e 238 // To avoid an unnecessary read passed the last data requested
f9673903 239 if (++nbRead < nbRequested) {
550d787e 240 data = getNext(context);
3d62f8b7
FC
241 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
242 Tracer.trace("Request #" + request.getRequestId() + " end of data");
243 }
550d787e
FC
244 }
245 }
550d787e 246 request.done();
9b635e61 247
3d62f8b7 248 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
550d787e
FC
249 }
250 catch (Exception e) {
9b635e61 251 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
550d787e 252 request.fail();
8c8bf09f 253 }
8c8bf09f
ASL
254 }
255 };
3d62f8b7 256
5c00c0b7 257 fExecutor.execute(thread);
3d62f8b7 258
550d787e 259 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
8c8bf09f
ASL
260 }
261
f6b14ce2
FC
262 // By default, same behavior as a foreground request
263 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
9b635e61
FC
264 queueRequest(request);
265 }
266
951d134a
FC
267 /**
268 * Initialize the provider based on the request. The context is
269 * provider specific and will be updated by getNext().
270 *
271 * @param request
272 * @return an application specific context; null if request can't be serviced
273 */
2fb2eb37 274 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
3d62f8b7
FC
275 public abstract T getNext(ITmfContext context);
276
8c8bf09f
ASL
277 /**
278 * Checks if the data meets the request completion criteria.
279 *
280 * @param request
281 * @param data
282 * @return
283 */
2fb2eb37 284 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
36548af3 285 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
8c8bf09f
ASL
286 }
287
951d134a
FC
288 // ------------------------------------------------------------------------
289 // Signal handlers
290 // ------------------------------------------------------------------------
291
8c8bf09f 292 @TmfSignalHandler
045df77d
FC
293 public void startSynch(TmfStartSynchSignal signal) {
294 synchronized (fLock) {
295 fSignalDepth++;
296 }
8c8bf09f
ASL
297 }
298
299 @TmfSignalHandler
045df77d
FC
300 public void endSynch(TmfEndSynchSignal signal) {
301 synchronized (fLock) {
302 fSignalDepth--;
303 if (fSignalDepth == 0) {
304 fireRequests();
305 }
306 }
8c8bf09f
ASL
307 }
308
309}
This page took 0.046356 seconds and 5 git commands to generate.