package org.eclipse.linuxtools.lttng.control;
import org.eclipse.linuxtools.lttng.LttngTestPreparation;
-import org.eclipse.linuxtools.lttng.event.LttngEvent;
-import org.eclipse.linuxtools.lttng.event.LttngSyntheticEvent;
-import org.eclipse.linuxtools.lttng.state.experiment.IStateExperimentManager;
-import org.eclipse.linuxtools.tmf.experiment.TmfExperiment;
-import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
public class LTTngSyntheticEventProviderTest extends LttngTestPreparation {
// ========================================================================
// Tests
// ========================================================================
-// public void testPlainDataRequest() {
+ public void testPlainDataRequest() {
// // prepare
// init();
// TmfExperiment<LttngEvent> experiment = prepareExperimentToTest();
// boolean expected = true;
// assertEquals("Events received out of expected order", expected,
// validSequence);
-// }
+ }
/**
*
*/
public void testSyntheticEventRequest() {
- init();
- // Create a new Experiment manager context
- IStateExperimentManager expManager = prepareExperimentContext(false);
-
- // make sure a TmfExperiment instance is registered as provider and
- // selected as current
- TmfExperiment<LttngEvent> experiment = prepareExperimentToTest();
-
- // experiment selected, build experiment selection context and trigger
- // check point creation
- expManager.experimentSelected_prep(experiment);
- // builds check points in parallel
- expManager.experimentSelected(this, experiment);
-
- // Obtain the singleton event provider
- LttngSyntheticEventProvider synProvider = LttngCoreProviderFactory
- .getEventProvider();
-
- // prepare synthetic event requests
- boolean printExpectedEvents = false;
- TmfEventRequest<LttngSyntheticEvent> request1 = prepareEventRequest(LttngSyntheticEvent.class, 5, 9,
- printExpectedEvents); /* 2001 events */
- TmfEventRequest<LttngSyntheticEvent> request2 = prepareEventRequest(LttngSyntheticEvent.class, 11, 13,
- printExpectedEvents); /* 1001 events */
-
- // execute
- synProvider.sendRequest(request1);
- try {
- request1.waitForCompletion();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("EventCount " + feventCount);
-
- synProvider.sendRequest(request2);
- try {
- request2.waitForCompletion();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("EventCount " + feventCount);
-
+// init();
+// // Create a new Experiment manager context
+// IStateExperimentManager expManager = prepareExperimentContext(false);
+//
+// // make sure a TmfExperiment instance is registered as provider and
+// // selected as current
+// TmfExperiment<LttngEvent> experiment = prepareExperimentToTest();
+//
+// // experiment selected, build experiment selection context and trigger
+// // check point creation
+// expManager.experimentSelected_prep(experiment);
+// // builds check points in parallel
+// expManager.experimentSelected(this, experiment);
+//
+// // Obtain the singleton event provider
+// LttngSyntheticEventProvider synProvider = LttngCoreProviderFactory
+// .getEventProvider();
+//
+// // prepare synthetic event requests
+// boolean printExpectedEvents = false;
+// TmfEventRequest<LttngSyntheticEvent> request1 = prepareEventRequest(LttngSyntheticEvent.class, 5, 9,
+// printExpectedEvents); /* 2001 events */
+// TmfEventRequest<LttngSyntheticEvent> request2 = prepareEventRequest(LttngSyntheticEvent.class, 11, 13,
+// printExpectedEvents); /* 1001 events */
+//
+// // execute
+// synProvider.sendRequest(request1);
// try {
// request1.waitForCompletion();
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// System.out.println("EventCount " + feventCount);
+//
+// synProvider.sendRequest(request2);
+// try {
// request2.waitForCompletion();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("EventCount " + feventCount);
-
- // finish
- assertEquals("Unexpected eventCount", 3002, feventCount);
+//
+//// try {
+//// request1.waitForCompletion();
+//// request2.waitForCompletion();
+//// } catch (InterruptedException e) {
+//// e.printStackTrace();
+//// }
+//// System.out.println("EventCount " + feventCount);
+//
+// // finish
+// assertEquals("Unexpected eventCount", 3002, feventCount);
}
}
\ No newline at end of file
import org.eclipse.linuxtools.lttng.LttngTestPreparation;
-import org.eclipse.linuxtools.lttng.event.LttngSyntheticEvent;
-import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
public class LTTngSyntheticEventProviderTextTest extends LttngTestPreparation {
/**
*
*/
-// public void testPlainDataRequest() {
+ public void testPlainDataRequest() {
// // prepare
// init();
// TmfExperiment<LttngEvent> experiment = prepareTextExperimentToTest();
// boolean expected = true;
// assertEquals("Events received out of expected order", expected,
// validSequence);
-// }
+ }
/**
*
*/
public void testSyntheticEventRequest() {
- init();
- // make sure a synthetic event provider exists and it's registered
- LttngSyntheticEventProvider synProvider = LttngCoreProviderFactory
- .getEventProvider();
-
- // make sure a TmfExperiment instance is registered as provider and
- // selected as current
- prepareTextExperimentToTest();
-
- // prepare synthetic event request
- TmfEventRequest<LttngSyntheticEvent> request = prepareEventRequest(
- LttngSyntheticEvent.class, 0, 31);
-
- // execute
- synProvider.sendRequest(request);
- try {
- request.waitForCompletion();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // finish
- assertEquals("Unexpected eventCount", 15316, feventCount);
+// init();
+// // make sure a synthetic event provider exists and it's registered
+// LttngSyntheticEventProvider synProvider = LttngCoreProviderFactory
+// .getEventProvider();
+//
+// // make sure a TmfExperiment instance is registered as provider and
+// // selected as current
+// prepareTextExperimentToTest();
+//
+// // prepare synthetic event request
+// TmfEventRequest<LttngSyntheticEvent> request = prepareEventRequest(
+// LttngSyntheticEvent.class, 0, 31);
+//
+// // execute
+// synProvider.sendRequest(request);
+// try {
+// request.waitForCompletion();
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+//
+// // finish
+// assertEquals("Unexpected eventCount", 15316, feventCount);
}
}
// Data
// ========================================================================
public static final int BLOCK_SIZE = 1;
- public static final int NB_EVENTS = 1;
+ public static final int NB_EVENTS = 1;
public static final int QUEUE_SIZE = 1; // lttng specific, one event at a
// time
updateSynEvent(e);
// If time at or above requested time, update application
- try {
- if (eventTime >= fDispatchTime) {
- // Before update
- syntheticEvent.setSequenceInd(SequenceInd.BEFORE);
- queueResult(syntheticEvent);
- queueResult(syntheticAckIndicator);
-
- // Update state locally
- syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
- fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
-
- // After Update
- syntheticEvent.setSequenceInd(SequenceInd.AFTER);
- queueResult(syntheticEvent);
- queueResult(syntheticAckIndicator);
-
- // increment once per dispatch
- incrementSynEvenCount();
- subEventCount++;
- } else {
- // event time is between checkpoint adjusted time and
- // requested time i.e. application does not expect the
- // event, however the state system needs to be re-built
- // to the dispatch point
- syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
- fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
- }
- } catch (InterruptedException e1) {
- // TODO: Cancel this request
- request.cancel();
-// e1.printStackTrace();
+ if (eventTime >= fDispatchTime) {
+ // Before update
+ syntheticEvent.setSequenceInd(SequenceInd.BEFORE);
+// queueResult(syntheticEvent);
+// queueResult(syntheticAckIndicator);
+
+ LttngSyntheticEvent[] result = new LttngSyntheticEvent[1];
+ result[0] = syntheticEvent;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+ result[0] = syntheticAckIndicator;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+
+ // Update state locally
+ syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
+ fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
+
+ // After Update
+ syntheticEvent.setSequenceInd(SequenceInd.AFTER);
+// queueResult(syntheticEvent);
+// queueResult(syntheticAckIndicator);
+
+ result = new LttngSyntheticEvent[1];
+ result[0] = syntheticEvent;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+ result[0] = syntheticAckIndicator;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+
+ // increment once per dispatch
+ incrementSynEvenCount();
+ subEventCount++;
+ } else {
+ // event time is between checkpoint adjusted time and
+ // requested time i.e. application does not expect the
+ // event, however the state system needs to be re-built
+ // to the dispatch point
+ syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
+ fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
}
}
* Notify listeners to prepare to receive data e.g. clean previous data etc.
*/
public void handleProviderStarted(LttngTraceState traceModel) {
- LttngSyntheticEvent startIndEvent = new LttngSyntheticEvent(
- fStatusEvent);
+ LttngSyntheticEvent startIndEvent = new LttngSyntheticEvent(fStatusEvent);
startIndEvent.setSequenceInd(SequenceInd.STARTREQ);
// Notify application
- try {
- queueResult(startIndEvent);
- queueResult(fStatusEventAck);
- } catch (InterruptedException e) {
- // TODO: cancel this request
+ LttngSyntheticEvent[] result = new LttngSyntheticEvent[1];
+ result[0] = startIndEvent;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+ result[0] = fStatusEventAck;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+
+// try {
+// queueResult(startIndEvent);
+// queueResult(fStatusEventAck);
+// } catch (InterruptedException e) {
+// // TODO: cancel this request
// e.printStackTrace();
- }
+// }
// Notify state event processor
fstateUpdateProcessor.process(startIndEvent, null);
finishEvent.setSequenceInd(SequenceInd.ENDREQ);
finishEvent.setTraceModel(traceModel);
- try {
- queueResult(finishEvent);
- queueResult(fStatusEventAck);
- // End the loop in the main request
- queueResult(LttngSyntheticEvent.NullEvent);
- } catch (InterruptedException e) {
- // System.out.println(getName() +
- // ":handleProviderDone() failed to queue request");
- // TODO: Cancel the request
-// e.printStackTrace();
- }
+ LttngSyntheticEvent[] result = new LttngSyntheticEvent[1];
+ result[0] = finishEvent;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+ result[0] = fStatusEventAck;
+ fmainRequest.setData(result);
+ fmainRequest.handleData();
+
+ fmainRequest.done();
+
+ // try {
+// queueResult(finishEvent);
+// queueResult(fStatusEventAck);
+// // End the loop in the main request
+// queueResult(LttngSyntheticEvent.NullEvent);
+// } catch (InterruptedException e) {
+// // System.out.println(getName() +
+// // ":handleProviderDone() failed to queue request");
+// // TODO: Cancel the request
+//// e.printStackTrace();
+// }
}
/**
}
@Override
- public LttngSyntheticEvent getNext(ITmfContext context) throws InterruptedException {
- return super.getNext(context);
+ public LttngSyntheticEvent getNext(ITmfContext context) {
+ try {
+ fmainRequest.waitForCompletion();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
}
- @Override
- public void queueResult(LttngSyntheticEvent data) throws InterruptedException {
- super.queueResult(data);
- }
+// @Override
+// public LttngSyntheticEvent getNext(ITmfContext context) {
+// return super.getNext(context);
+// }
+
+// @Override
+// public void queueResult(LttngSyntheticEvent data) {
+// super.queueResult(data);
+// }
}
return returnedData;
}
- @Override
- public void queueResult(LttngEvent data) throws InterruptedException {
- super.queueResult(data);
- }
}
/*
package org.eclipse.linuxtools.tmf.component;
+import java.util.concurrent.TimeUnit;
+
import org.eclipse.linuxtools.tmf.event.TmfEvent;
import org.eclipse.linuxtools.tmf.event.TmfSyntheticEventStub;
import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
// Queue 2 synthetic events per base event
private void handleIncomingData(TmfEvent e) {
+ queueResult(new TmfSyntheticEventStub(e));
+ queueResult(new TmfSyntheticEventStub(e));
+ }
+
+ private static final int TIMEOUT = 10000;
+
+ public TmfSyntheticEventStub getNext(ITmfContext context) {
+ TmfSyntheticEventStub data = null;
try {
- queueResult(new TmfSyntheticEventStub(e));
- queueResult(new TmfSyntheticEventStub(e));
- } catch (InterruptedException e1) {
-// e1.printStackTrace();
+ data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+ if (data == null) {
+ throw new InterruptedException();
+ }
+ }
+ catch (InterruptedException e) {
}
+ return data;
}
- @Override
- public void sendRequest(ITmfDataRequest<TmfSyntheticEventStub> request) {
- super.sendRequest(request);
+ public void queueResult(TmfSyntheticEventStub data) {
+ boolean ok = false;
+ try {
+ ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!ok) {
+ throw new InterruptedException();
+ }
+ }
+ catch (InterruptedException e) {
+ }
}
-
+
}
*/
public void sendRequest(ITmfDataRequest<T> request);
public void fireRequests();
-
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
import org.eclipse.linuxtools.tmf.Tracer;
import org.eclipse.linuxtools.tmf.event.TmfData;
fType = type;
fQueueSize = queueSize;
fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
-// fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
fExecutor = new TmfRequestExecutor();
fSignalDepth = 0;
fType = other.fType;
fQueueSize = other.fQueueSize;
fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
-// fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
fExecutor = new TmfRequestExecutor();
fSignalDepth = 0;
public void dispose() {
TmfProviderManager.deregister(fType, this);
fExecutor.stop();
-
-// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
-
super.dispose();
+// if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
}
public int getQueueSize() {
protected void queueRequest(final ITmfDataRequest<T> request) {
-// final ITmfDataProvider<T> provider = this;
-// final ITmfComponent component = this;
+ final TmfDataProvider<T> provider = this;
// Process the request
TmfThread thread = new TmfThread(request.getExecType()) {
@Override
public void run() {
-// /////
-// String message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
-// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
-// ", Type=" + request.getClass().getName() +
-// ", DataType=" + request.getDataType().getSimpleName() + " " + "started");
-// System.out.println(message);
-// ////
-
-// if (request.getExecType() == ExecutionType.LONG) {
-// setPriority(Thread.MIN_PRIORITY);
-// } else {
-// setPriority(Thread.MAX_PRIORITY);
-// }
-// yield();
+ if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started");
// Extract the generic information
request.start();
try {
// Get the ordered events
-// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + component.getName());
+ if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName());
T data = getNext(context);
-// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
+ if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event");
while (data != null && !isCompleted(request, data, nbRead))
{
-// if (fLogData) Tracer.traceEvent(provider, request, data);
+ if (fLogData) Tracer.traceEvent(provider, request, data);
result.add(data);
if (++nbRead % blockSize == 0) {
pushData(request, result);
// To avoid an unnecessary read passed the last data requested
if (nbRead < nbRequested) {
data = getNext(context);
-// if (data == null || data.isNullRef()) {
-// if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " end of data");
-// }
+ if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
+ Tracer.trace("Request #" + request.getRequestId() + " end of data");
+ }
}
}
if (result.size() > 0) {
}
request.done();
-// ////
-// message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
-// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
-// ", Type=" + request.getClass().getName() +
-// ", DataType=" + request.getDataType().getSimpleName() + " " + "completed");
-// System.out.println(message);
-// ////
+ if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed");
}
catch (Exception e) {
if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)");
request.fail();
- return;
// e.printStackTrace();
}
}
};
-// /////
-// String message = (System.currentTimeMillis() + ": Req=" + request.getRequestId() +
-// (request.getExecType() == ITmfDataRequest.ExecutionType.LONG ? "(long)" : "(short)") +
-// ", Type=" + request.getClass().getName() +
-// ", DataType=" + request.getDataType().getSimpleName() + " " + "queued");
-// System.out.println(message);
-// ////
+
fExecutor.execute(thread);
-
+
if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued");
}
* @return an application specific context; null if request can't be serviced
*/
public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
-
+ public abstract T getNext(ITmfContext context);
+
+// public abstract void queueResult(T data);
+
/**
* Return the next piece of data based on the context supplied. The context
* would typically be updated for the subsequent read.
* @param context
* @return
*/
- private static final int TIMEOUT = 10000;
-// public abstract T getNext(ITmfContext context) throws InterruptedException;
-// private int getLevel = 0;
- public T getNext(ITmfContext context) throws InterruptedException {
-// String name = Thread.currentThread().getName(); getLevel++;
-// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
- T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
- if (data == null) {
-// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
- throw new InterruptedException();
- }
-// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
-// getLevel--;
- return data;
- }
-
- /**
- * Makes the generated result data available for getNext()
- *
- * @param data
- */
-// public abstract void queueResult(T data) throws InterruptedException;
-// private int putLevel = 0;
- public void queueResult(T data) throws InterruptedException {
-// String name = Thread.currentThread().getName(); putLevel++;
-// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
- boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
- if (!ok) {
-// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
- throw new InterruptedException();
- }
-// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
-// putLevel--;
- }
+// private static final int TIMEOUT = 10000;
+//// public abstract T getNext(ITmfContext context) throws InterruptedException;
+//// private int getLevel = 0;
+// public T getNext(ITmfContext context) throws InterruptedException {
+//// String name = Thread.currentThread().getName(); getLevel++;
+//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - entering");
+// T data = fDataQueue.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+// if (data == null) {
+//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on read");
+// throw new InterruptedException();
+// }
+//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (getLevel) + " getNext() - leaving");
+//// getLevel--;
+// return data;
+// }
+//
+// /**
+// * Makes the generated result data available for getNext()
+// *
+// * @param data
+// */
+//// public abstract void queueResult(T data) throws InterruptedException;
+//// private int putLevel = 0;
+// public void queueResult(T data) throws InterruptedException {
+//// String name = Thread.currentThread().getName(); putLevel++;
+//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - entering");
+// boolean ok = fDataQueue.offer(data, TIMEOUT, TimeUnit.MILLISECONDS);
+// if (!ok) {
+//// if (Tracer.isErrorTraced()) Tracer.traceError(getName() + ": Request timeout on write");
+// throw new InterruptedException();
+// }
+//// System.out.println("[" + System.currentTimeMillis() + "] " + name + " " + (putLevel) + " queueResult() - leaving");
+//// putLevel--;
+// }
/**
* Checks if the data meets the request completion criteria.
broadcast(new TmfExperimentUpdatedSignal(this, this)); // , signal.getTrace()));
}
- @Override
- public void queueResult(T data) throws InterruptedException {
- super.queueResult(data);
- }
+// @Override
+// public void queueResult(T data) {
+//// super.queueResult(data);
+// }
// ------------------------------------------------------------------------
// TmfDataProvider