import org.eclipse.linuxtools.tmf.experiment.TmfExperiment;
import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
import org.eclipse.linuxtools.tmf.request.ITmfEventRequest;
-import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
import org.eclipse.linuxtools.tmf.trace.ITmfContext;
import org.eclipse.linuxtools.tmf.trace.TmfContext;
// TmfDataProvider<LttngEvent> fExtProvider = null;
private ITmfDataRequest<LttngSyntheticEvent> fmainRequest = null;
- private final Map<IStateTraceManager, LttngBaseEventRequest> feventProviderRequests = new HashMap<IStateTraceManager, LttngBaseEventRequest>();
+ private final Map<IStateTraceManager, LttngBaseEventRequest> fEventProviderRequests = new HashMap<IStateTraceManager, LttngBaseEventRequest>();
private final LttngSyntheticEvent fStatusEvent;
private final LttngSyntheticEvent fStatusEventAck;
- private int fmainReqEventCount = 0;
+ private int fMainReqEventCount = 0;
volatile boolean startIndSent = false;
private LTTngTreeNode fExperiment = null;
private ITransEventProcessor fstateUpdateProcessor = StateEventToHandlerFactory
// ========================================================================
// Methods
// ========================================================================
+
@SuppressWarnings("unchecked")
@Override
public ITmfContext armRequest(
reset(fExperiment);
// At least one base provider shall be available
- if (feventProviderRequests.size() < 1) {
+ if (fEventProviderRequests.size() < 1) {
request.cancel();
TraceDebug.debug("No Base event providers available");
return null;
// define event data handling
ITmfEventRequest<LttngSyntheticEvent> eventRequest = (ITmfEventRequest<LttngSyntheticEvent>) fmainRequest;
TmfTimeRange reqWindow = eventRequest.getRange();
-
+
TraceDebug.debug("Main Synthethic event request started on thread: " + Thread.currentThread().getName());
// loop for every traceManager in current experiment
- for (IStateTraceManager traceManager : feventProviderRequests
- .keySet()) {
+ boolean subRequestQueued = false;
+ for (IStateTraceManager traceManager : fEventProviderRequests.keySet()) {
// restore trace state system to nearest check point
TmfTimestamp checkPoint = traceManager
.restoreCheckPointByTimestamp(reqWindow.getStartTime());
+
+ // adjust start time bound to check point
+
// validate so checkpoint restore is within requested bounds
TmfTimeRange traceRange = traceManager.getTrace().getTimeRange();
- if (!(checkPoint.getValue() >= traceRange.getStartTime().getValue()
- && checkPoint.getValue() <= traceRange.getEndTime()
- .getValue() && checkPoint.getValue() < reqWindow
- .getEndTime().getValue())) {
+ if ((checkPoint != null) && !(
+ checkPoint.getValue() >= traceRange.getStartTime().getValue() &&
+ checkPoint.getValue() <= traceRange.getEndTime().getValue() &&
+ checkPoint.getValue() < reqWindow.getEndTime().getValue())
+ ) {
// checkpoint is out of trace bounds
continue;
}
-
- // adjust start time bound to check point
- TmfTimeRange adjustedRange = new TmfTimeRange(checkPoint, reqWindow
- .getEndTime());
+ TmfTimeRange adjustedRange = reqWindow;
+ if (checkPoint != null) {
+ adjustedRange = new TmfTimeRange(checkPoint, reqWindow.getEndTime());
+ }
LttngTraceState traceModel = traceManager.getStateModel();
// create sub-request for one trace within experiment
final LttngBaseEventRequest subRequest = new LttngBaseEventRequest(
adjustedRange, reqWindow.getStartTime(), 0,
- TmfEventRequest.ALL_DATA, BLOCK_SIZE, traceModel) {
+ TmfEventRequest.ALL_DATA, BLOCK_SIZE, traceModel, ITmfDataRequest.ExecutionType.SHORT) {
private LttngSyntheticEvent syntheticEvent = null;
private LttngSyntheticEvent syntheticAckIndicator = null;
}
} else {
TraceDebug.debug("handle data received with no data");
- // done();
+// handleProviderDone(getTraceModel());
+// done();
}
}
// mark this sub-request as completed
super.done();
handleProviderDone(getTraceModel());
+// super.done();
}
/**
// queue the new event data and an ACK
updateSynEvent(e);
-
-
// If time at or above requested time, update application
- if (eventTime >= fDispatchTime) {
- // Before update
- syntheticEvent.setSequenceInd(SequenceInd.BEFORE);
- queueResult(syntheticEvent);
- queueResult(syntheticAckIndicator);
-
- // Update state locally
- syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
- fstateUpdateProcessor.process(syntheticEvent,
- fTraceModel);
-
- // After Update
- syntheticEvent.setSequenceInd(SequenceInd.AFTER);
- queueResult(syntheticEvent);
- queueResult(syntheticAckIndicator);
-
- // increment once per dispatch
- incrementSynEvenCount();
- subEventCount++;
- } else {
- // event time is between checkpoint adjusted time and
- // requested time i.e. application does not expect the
- // event, however the state system needs to be re-built
- // to the dispatch point
- syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
- fstateUpdateProcessor.process(syntheticEvent,
- fTraceModel);
+ try {
+ if (eventTime >= fDispatchTime) {
+ // Before update
+ syntheticEvent.setSequenceInd(SequenceInd.BEFORE);
+ queueResult(syntheticEvent);
+ queueResult(syntheticAckIndicator);
+
+ // Update state locally
+ syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
+ fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
+
+ // After Update
+ syntheticEvent.setSequenceInd(SequenceInd.AFTER);
+ queueResult(syntheticEvent);
+ queueResult(syntheticAckIndicator);
+
+ // increment once per dispatch
+ incrementSynEvenCount();
+ subEventCount++;
+ } else {
+ // event time is between checkpoint adjusted time and
+ // requested time i.e. application does not expect the
+ // event, however the state system needs to be re-built
+ // to the dispatch point
+ syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
+ fstateUpdateProcessor.process(syntheticEvent, fTraceModel);
+ }
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
}
}
return syntheticEvent;
}
};
-
+
// preserve the associated sub request to control it e.g.
// cancellation
- feventProviderRequests.put(traceManager, subRequest);
+ fEventProviderRequests.put(traceManager, subRequest);
// start request
TmfTrace<LttngEvent> provider = (TmfTrace<LttngEvent>) traceManager
.getTrace();
+ // provider.sendRequest(subRequest, ExecutionType.LONG);
provider.sendRequest(subRequest);
+ subRequestQueued = true;
}
// Return a dummy context, not used for relay provider
- return new TmfContext();
+ return (subRequestQueued) ? new TmfContext() : null;
}
/**
startIndEvent.setSequenceInd(SequenceInd.STARTREQ);
// Notify application
- queueResult(startIndEvent);
- queueResult(fStatusEventAck);
+ try {
+ queueResult(startIndEvent);
+ queueResult(fStatusEventAck);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
// Notify state event processor
fstateUpdateProcessor.process(startIndEvent, null);
// handle completion and cancellations properly
// Close the main request when all sub-requests are marked completed
- for (LttngBaseEventRequest subRequest : feventProviderRequests.values()) {
+ for (LttngBaseEventRequest subRequest : fEventProviderRequests.values()) {
if (subRequest != null) {
if (!subRequest.isCompleted()) {
// Not ready to complete main request
// All sub-requests are marked completed so the main request can be
// completed as well
// Notify application,
- LttngSyntheticEvent finishEvent = new LttngSyntheticEvent(fStatusEvent);
- finishEvent.setSequenceInd(SequenceInd.ENDREQ);
- finishEvent.setTraceModel(traceModel);
- queueResult(finishEvent);
- queueResult(fStatusEventAck);
- // End the loop in the main request
- queueResult(LttngSyntheticEvent.NullEvent);
+// LttngSyntheticEvent finishEvent = new LttngSyntheticEvent(fStatusEvent);
+// finishEvent.setSequenceInd(SequenceInd.ENDREQ);
+// finishEvent.setTraceModel(traceModel);
+
+ try {
+// queueResult(finishEvent);
+// queueResult(fStatusEventAck);
+ // End the loop in the main request
+ queueResult(LttngSyntheticEvent.NullEvent);
+ } catch (InterruptedException e) {
+ // System.out.println(getName() +
+ // ":handleProviderDone() failed to queue request");
+ e.printStackTrace();
+ }
}
/**
* Increment the global event counter i.e. events from any sub requests
*/
private synchronized void incrementSynEvenCount() {
- fmainReqEventCount++;
+ fMainReqEventCount++;
}
/**
* @return
*/
public synchronized int getSynEvenCount() {
- return fmainReqEventCount;
+ return fMainReqEventCount;
}
/**
*
* @param experimentNode
*/
- public /* synchronized */ void reset(LTTngTreeNode experimentNode) {
+ public synchronized void reset(LTTngTreeNode experimentNode) {
fmainRequest = null;
// Make sure previous request are terminated
- for (LttngBaseEventRequest tmpRequest : feventProviderRequests.values()) {
+ for (LttngBaseEventRequest tmpRequest : fEventProviderRequests.values()) {
if (tmpRequest != null && !tmpRequest.isCompleted()) {
tmpRequest.cancel();
}
}
- feventProviderRequests.clear();
- fmainReqEventCount = 0;
+ fEventProviderRequests.clear();
+ fMainReqEventCount = 0;
startIndSent = false;
// set of base event providers
LTTngTreeNode[] traces = fExperiment.getChildren();
for (LTTngTreeNode trace : traces) {
IStateTraceManager traceBaseEventProvider = (IStateTraceManager) trace;
- feventProviderRequests.put(traceBaseEventProvider, null);
+ fEventProviderRequests.put(traceBaseEventProvider, null);
}
}
* org.eclipse.linuxtools.tmf.component.TmfDataProvider#sendRequest(org.
* eclipse.linuxtools.tmf.request.TmfDataRequest)
*/
- public void sendRequest(final TmfDataRequest<LttngSyntheticEvent> request) {
+ public void sendRequest(final ITmfDataRequest<LttngSyntheticEvent> request) {
super.sendRequest(request);
if (waitForRequest) {
try {