#include <babeltrace2/babeltrace.h>
-#include "cpp-common/cfg-logging.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
#include "cpp-common/make-unique.hpp"
#include "common/assert.h"
#include "compat/mman.h"
+#include "../common/src/pkt-props.hpp"
#include "cpp-common/make-unique.hpp"
#include "data-stream.hpp"
+#include "cpp-common/exc.hpp"
#include "cpp-common/cfg-logging-error-reporting.hpp"
+#include "cpp-common/cfg-logging-error-reporting-throw.hpp"
#define STREAM_NAME_PREFIX "stream-"
-static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
- size_t *buffer_sz, void *data)
+namespace ctf {
+namespace src {
+namespace live {
+
+Buf CtfLiveMedium::buf(bt2_common::DataLen requestedOffsetInStream, bt2_common::DataLen minSize)
{
- lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data;
- struct lttng_live_trace *trace = stream->trace;
- struct lttng_live_session *session = trace->session;
- struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
- uint64_t recv_len = 0;
- uint64_t len_left;
- uint64_t read_len;
-
- BT_ASSERT(request_sz);
-
- if (stream->has_stream_hung_up) {
- return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+ const bt2_common::LogCfg& logCfg = _mLiveStreamIter.logCfg;
+ BT_CLOGD("CtfLiveMedium::buf called: stream-id=%" PRId64
+ ", offset-bytes=%llu, min-size-bytes=%llu",
+ _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1,
+ requestedOffsetInStream.bytes(), minSize.bytes());
+
+ if (_mLiveStreamIter.has_stream_hung_up)
+ throw NoData {};
+
+ BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
+ bt2_common::DataLen requestedOffsetInPacket =
+ requestedOffsetInStream - _mCurPktBegOffsetInStream;
+
+ BT_ASSERT(_mLiveStreamIter.curPktInfo);
+
+ if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
+ _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
+ _mLiveStreamIter.curPktInfo.reset();
+ lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+ throw bt2_common::TryAgain {};
}
- len_left = stream->base_offset + stream->len - stream->offset;
- if (!len_left) {
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ bt2_common::DataLen requestedOffsetInRelay =
+ _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
+ bt2_common::DataLen lenUntilEndOfPacket =
+ _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
+
+ bt2_common::DataLen maxReqLen = bt2_common::DataLen::fromBytes(
+ _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
+ bt2_common::DataLen reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
+ uint64_t recvLen;
+
+ _mBuf.resize(reqLen.bytes());
+
+ lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
+ _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
+ requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
+ switch (status) {
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
+ _mBuf.resize(recvLen);
+ break;
+
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
+ BT_CLOGD("CtfLiveMedium::buf try again");
+ throw bt2_common::TryAgain();
+
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
+ BT_CLOGD("CtfLiveMedium::buf eof");
+ throw NoData();
+
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
+ BT_CLOGD("CtfLiveMedium::buf error");
+ throw bt2_common::Error();
}
- read_len = MIN(request_sz, stream->buf.size());
- read_len = MIN(read_len, len_left);
- ctf_msg_iter_medium_status status = lttng_live_get_stream_bytes(
- live_msg_iter, stream, stream->buf.data(), stream->offset, read_len, &recv_len);
- if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
- return status;
- }
+ Buf buf {_mBuf.data(), bt2_common::DataLen::fromBytes(_mBuf.size())};
- *buffer_addr = stream->buf.data();
- *buffer_sz = recv_len;
- stream->offset += recv_len;
+ BT_CLOGD("CtfLiveMedium::buf returns: stream-id=%" PRId64 ", buf-addr=%p, buf-size-bytes=%llu",
+ _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1, buf.addr(),
+ buf.size().bytes());
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+ return buf;
}
-static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data)
-{
- lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data;
- const bt2_common::LogCfg& logCfg = lttng_live_stream->logCfg;
-
- if (!lttng_live_stream->stream) {
- uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
-
- BT_CLOGI("Creating stream %s (ID: %" PRIu64 ") out of stream "
- "class %" PRId64,
- lttng_live_stream->name.c_str(), stream_id, stream_class_id);
-
- bt_stream *stream;
-
- if (stream_id < 0) {
- /*
- * No stream instance ID in the stream. It's possible
- * to encounter this situation with older version of
- * LTTng. In these cases, use the viewer_stream_id that
- * is unique for a live viewer session.
- */
- stream = bt_stream_create_with_id(stream_class,
- (*lttng_live_stream->trace->trace)->libObjPtr(),
- lttng_live_stream->viewer_stream_id);
- } else {
- stream = bt_stream_create_with_id(stream_class,
- (*lttng_live_stream->trace->trace)->libObjPtr(),
- (uint64_t) stream_id);
- }
-
- if (!stream) {
- BT_CLOGE_APPEND_CAUSE("Cannot create stream %s (stream class ID "
- "%" PRId64 ", stream ID %" PRIu64 ")",
- lttng_live_stream->name.c_str(), stream_class_id, stream_id);
- return nullptr;
- }
-
- lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream);
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
- (*lttng_live_stream->stream)->name(lttng_live_stream->name);
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
+{
+ BT_ASSERT(!liveStreamIter->msg_iter);
+ BT_ASSERT(!liveStreamIter->stream);
+ lttng_live_trace *trace = liveStreamIter->trace;
+ lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
+
+ ctf::src::Medium::UP tempMedium =
+ bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+ const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
+ BT_ASSERT(ctfTc);
+ ctf::src::PktProps pktProps = ctf::src::readPktProps(
+ *ctfTc, std::move(tempMedium), bt2_common::DataLen::fromBytes(0), liveStreamIter->logCfg);
+
+ nonstd::optional<bt2::TraceClass> tc = ctfTc->libCls();
+ BT_ASSERT(tc);
+ BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
+ BT_ASSERT(trace->trace);
+
+ const bt2_common::LogCfg& logCfg = liveStreamIter->logCfg;
+ nonstd::optional<bt2::StreamClass> sc =
+ tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
+ if (!sc) {
+ BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "No stream class with id %" PRId64,
+ liveStreamIter->ctf_stream_class_id.value);
}
- return (*lttng_live_stream->stream)->libObjPtr();
+ // FIXME: in the original, there is a fall back if the data stream id is not available.
+ bt_stream *streamPtr = bt_stream_create_with_id(sc->libObjPtr(), (*trace->trace)->libObjPtr(),
+ *pktProps.dataStreamId);
+ BT_ASSERT(streamPtr);
+ liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
+ (*liveStreamIter->stream)->name(liveStreamIter->name);
+
+ ctf::src::Medium::UP medium =
+ bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+ liveStreamIter->msg_iter.emplace(
+ liveMsgIter->self_msg_iter, *ctfTc, liveStreamIter->trace->metadata->metadataStreamUuid(),
+ **liveStreamIter->stream, std::move(medium), ctf::src::MsgIterQuirks {}, logCfg);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
-static struct ctf_msg_iter_medium_ops medops = {
- medop_request_bytes,
- nullptr,
- nullptr,
- medop_borrow_stream,
-};
-
BT_HIDDEN
enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
bt_self_message_iterator *self_msg_iter)
{
- struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
const bt2_common::LogCfg& logCfg = session->logCfg;
if (!session->lazy_stream_msg_init) {
for (lttng_live_trace::UP& trace : session->traces) {
for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
- struct ctf_trace_class *ctf_tc;
-
if (stream_iter->msg_iter) {
continue;
}
- ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
+ const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
BT_CLOGD("Creating CTF message iterator: "
"session-id=%" PRIu64 ", ctf-tc-addr=%p, "
"stream-iter-name=%s, self-msg-iter-addr=%p",
- session->id, ctf_tc, stream_iter->name.c_str(), self_msg_iter);
- stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
- stream_iter.get(), self_msg_iter, logCfg);
- if (!stream_iter->msg_iter) {
- BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ session->id, ctfTraceCls, stream_iter->name.c_str(), self_msg_iter);
}
}
lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
{
- struct lttng_live_component *lttng_live;
struct lttng_live_trace *trace;
std::stringstream nameSs;
const bt2_common::LogCfg& logCfg = session->logCfg;
- lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
-
lttng_live_stream_iterator::UP stream_iter =
bt2_common::makeUnique<lttng_live_stream_iterator>(logCfg);
trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
stream_iter->last_inactivity_ts.is_set = false;
stream_iter->last_inactivity_ts.value = 0;
- if (trace->trace) {
- struct ctf_trace_class *ctf_tc =
- ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
- BT_ASSERT(!stream_iter->msg_iter);
- stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
- stream_iter.get(), self_msg_iter, logCfg);
- if (!stream_iter->msg_iter) {
- BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
- return nullptr;
- }
- }
- stream_iter->buf.resize(lttng_live->max_query_size);
-
nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
stream_iter->name = nameSs.str();
return ret;
}
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+ uint64_t ctfStreamClsId)
+{
+ if (streamIter->ctf_stream_class_id.is_set) {
+ BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
+ return;
+ } else {
+ streamIter->ctf_stream_class_id.value = ctfStreamClsId;
+ streamIter->ctf_stream_class_id.is_set = true;
+ }
+}
+
lttng_live_stream_iterator::~lttng_live_stream_iterator()
{
/* Track the number of active stream iterator. */
#include <glib.h>
-#include "../common/src/msg-iter/msg-iter.hpp"
#include "lttng-live.hpp"
enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
uint64_t stream_id, bt_self_message_iterator *self_msg_iter);
+namespace ctf {
+namespace src {
+namespace live {
+
+struct CtfLiveMedium : Medium
+{
+ CtfLiveMedium(lttng_live_stream_iterator& liveStreamIter) : _mLiveStreamIter(liveStreamIter)
+ {
+ }
+
+ Buf buf(bt2_common::DataLen offset, bt2_common::DataLen minSize) override;
+
+private:
+ lttng_live_stream_iterator& _mLiveStreamIter;
+
+ bt2_common::DataLen _mCurPktBegOffsetInStream = bt2_common::DataLen::fromBits(0);
+ std::vector<uint8_t> _mBuf;
+};
+
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter);
+
#endif /* LTTNG_LIVE_DATA_STREAM_H */
#include <babeltrace2/types.h>
#include "cpp-common/exc.hpp"
#include "cpp-common/glib-up.hpp"
+#include "cpp-common/lib-str.hpp"
#include "cpp-common/make-unique.hpp"
#include "cpp-common/vector.hpp"
#include "cpp-common/cfg-logging-error-reporting.hpp"
#include "cpp-common/cfg-logging-error-reporting-throw.hpp"
+#include "../common/src/pkt-props.hpp"
+
#include "data-stream.hpp"
#include "metadata.hpp"
#include "lttng-live.hpp"
goto end;
}
- lttng_live_stream->base_offset = index.offset;
- lttng_live_stream->offset = index.offset;
- lttng_live_stream->len = index.packet_size / CHAR_BIT;
+ lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo {
+ bt2_common::DataLen::fromBytes(index.offset),
+ bt2_common::DataLen::fromBits(index.packet_size),
+ });
BT_CLOGD("Setting live stream reading info: stream-name=\"%s\", "
- "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 ", stream-offset=%" PRIu64
- ", stream-len=%" PRIu64,
+ "viewer-stream-id=%" PRIu64 ", stream-base-offset=%llu"
+ ", stream-len-bytes=%llu",
lttng_live_stream->name.c_str(), lttng_live_stream->viewer_stream_id,
- lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
+ lttng_live_stream->curPktInfo->offsetInRelay.bytes(),
+ lttng_live_stream->curPktInfo->len.bytes());
end:
if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
break;
case LTTNG_LIVE_ITERATOR_STATUS_END:
/*
- * We received a `_END` from the `_get_new_streams()` function,
- * which means no more data will ever be received from the data
- * streams of this session. But it's possible that the metadata
- * is incomplete.
- * The live protocol guarantees that we receive all the
- * metadata needed before we receive data streams needing it.
- * But it's possible to receive metadata NOT needed by
- * data streams after the session was closed. For example, this
- * could happen if a new event is registered and the session is
- * stopped before any tracepoint for that event is actually
- * fired.
- */
+ * We received a `_END` from the `_get_new_streams()` function,
+ * which means no more data will ever be received from the data
+ * streams of this session. But it's possible that the metadata
+ * is incomplete.
+ * The live protocol guarantees that we receive all the
+ * metadata needed before we receive data streams needing it.
+ * But it's possible to receive metadata NOT needed by
+ * data streams after the session was closed. For example, this
+ * could happen if a new event is registered and the session is
+ * stopped before any tracepoint for that event is actually
+ * fired.
+ */
BT_CLOGD(
"Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
"session-id=%" PRIu64 ", session-name=\"%s\"",
", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
- msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
- stream_iter->trace->clock_class, timestamp);
+ msg = bt_message_message_iterator_inactivity_create(
+ lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class->libObjPtr(),
+ timestamp);
if (!msg) {
BT_CLOGE_APPEND_CAUSE("Error emitting message iterator inactivity message");
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
struct lttng_live_msg_iter *lttng_live_msg_iter,
- const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
+ bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
{
- const bt_clock_snapshot *clock_snapshot = NULL;
- int ret = 0;
+ nonstd::optional<bt2::ConstClockSnapshot> clockSnapshot;
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
- BT_ASSERT_DBG(msg);
BT_ASSERT_DBG(ts_ns);
BT_CLOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
"last-msg-ts=%" PRId64,
- lttng_live_msg_iter, msg, last_msg_ts_ns);
+ lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns);
- switch (bt_message_get_type(msg)) {
- case BT_MESSAGE_TYPE_EVENT:
- clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
+ switch (msg.type()) {
+ case bt2::MessageType::EVENT:
+ clockSnapshot = msg.asEvent().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PACKET_BEGINNING:
+
+ clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_END:
- clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PACKET_END:
+ clockSnapshot = msg.asPacketEnd().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- clock_snapshot =
- bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DISCARDED_EVENTS:
+ clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_snapshot =
- bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DISCARDED_PACKETS:
+ clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
- clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
+ case bt2::MessageType::MESSAGE_ITERATOR_INACTIVITY:
+ clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot();
break;
default:
/* All the other messages have a higher priority */
BT_CLOGD(
"Message has no timestamp, using the last message timestamp: iter-data-addr=%p, msg-addr=%p, "
"last-msg-ts=%" PRId64 ", ts=%" PRId64,
- lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+ lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns, *ts_ns);
*ts_ns = last_msg_ts_ns;
return 0;
}
- ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
- if (ret) {
- BT_CLOGE_APPEND_CAUSE("Cannot get nanoseconds from Epoch of clock snapshot: "
- "clock-snapshot-addr=%p",
- clock_snapshot);
- return -1;
- }
+ *ts_ns = clockSnapshot->nsFromOrigin();
BT_CLOGD("Found message's timestamp: "
"iter-data-addr=%p, msg-addr=%p, "
"last-msg-ts=%" PRId64 ", ts=%" PRId64,
- lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+ lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns, *ts_ns);
return 0;
}
nonstd::optional<bt2::ConstMessage::Shared>& message)
{
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
- enum ctf_msg_iter_status status;
for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
if (session->new_streams_needed) {
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- const bt_message *msg;
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
- switch (status) {
- case CTF_MSG_ITER_STATUS_EOF:
- return LTTNG_LIVE_ITERATOR_STATUS_END;
- case CTF_MSG_ITER_STATUS_OK:
- message = bt2::ConstMessage::Shared::createWithoutRef(msg);
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
- case CTF_MSG_ITER_STATUS_AGAIN:
- /*
- * Continue immediately (end of packet). The next
- * get_index may return AGAIN to delay the following
- * attempt.
- */
- return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- case CTF_MSG_ITER_STATUS_ERROR:
- default:
+ if (!lttng_live_stream->msg_iter) {
+ /* The first time we're called for this stream, the MsgIter is not instantiated. */
+ enum lttng_live_iterator_status ret =
+ lttng_live_stream_iterator_create_msg_iter(lttng_live_stream);
+ if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ return ret;
+ }
+ }
+
+ try {
+ message = lttng_live_stream->msg_iter->next();
+ if (message) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } else {
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ } catch (const bt2_common::TryAgain&) {
+ return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } catch (const bt2::Error&) {
BT_CLOGE_APPEND_CAUSE("CTF message iterator failed to get next message: "
- "msg-iter=%p, msg-iter-status=%s",
- lttng_live_stream->msg_iter.get(),
- ctf_msg_iter_status_string(status));
+ "msg-iter=%p",
+ &*lttng_live_stream->msg_iter);
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
}
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
- const bt_message *msg;
- enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
+ try {
+ if (!stream_iter->msg_iter) {
+ BT_CLOGI("Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+
+ curr_msg = stream_iter->msg_iter->next();
+ if (!curr_msg) {
+ BT_CLOGI("Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
- if (status == CTF_MSG_ITER_STATUS_ERROR) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } catch (const bt2::Error&) {
BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- } else if (status == CTF_MSG_ITER_STATUS_EOF) {
- BT_CLOGI("Reached the end of the live stream iterator.");
- return LTTNG_LIVE_ITERATOR_STATUS_END;
}
-
- BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
-
- curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
-
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
/*
}
static enum lttng_live_iterator_status adjust_discarded_packets_message(
- bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
- nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
+ bt_self_message_iterator *iter, bt2::Stream stream, bt2::ConstDiscardedPacketsMessage msgIn,
+ nonstd::optional<bt2::Message::Shared>& msgOut, uint64_t new_begin_ts)
{
- enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_packets_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ uint64_t end_ts = msgIn.endDefaultClockSnapshot().value();
+ nonstd::optional<uint64_t> count = msgIn.count();
+ BT_ASSERT_DBG(count);
bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, end_ts);
if (!msg) {
- status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
- goto end;
+ return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_packets_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_packets_set_count(msg, *count);
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
-end:
- return status;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
-static enum lttng_live_iterator_status adjust_discarded_events_message(
- bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
- nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
+static enum lttng_live_iterator_status
+adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream,
+ bt2::ConstDiscardedEventsMessage msgIn,
+ nonstd::optional<bt2::Message::Shared>& msgOut,
+ uint64_t new_begin_ts)
{
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_events_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ uint64_t end_ts = msgIn.endDefaultClockSnapshot().value();
+ nonstd::optional<uint64_t> count = msgIn.count();
+ BT_ASSERT_DBG(count);
bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, end_ts);
if (!msg) {
return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_events_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_events_set_count(msg, *count);
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
bt2::ConstMessage::Shared late_msg)
{
const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
- const bt_clock_class *clock_class;
- const bt_stream_class *stream_class;
- enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
- int64_t last_inactivity_ts_ns;
- enum lttng_live_iterator_status adjust_status;
- nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
/*
* The timestamp of the current message is before the last message sent
}
if (!is_discarded_packet_or_event_message(*late_msg)) {
- BT_CLOGE_APPEND_CAUSE(
- "Invalid live stream state: "
- "have a late message that is not a packet discarded or "
- "event discarded message: late-msg-type=%s",
- bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
+ BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
+ "have a late message that is not a packet discarded or "
+ "event discarded message: late-msg-type=%s",
+ bt2_common::messageTypeStr(late_msg->type()));
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- stream_class = bt_stream_borrow_class_const((*stream_iter->stream)->libObjPtr());
- clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
-
- ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
- clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
- if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
- BT_CLOGE_APPEND_CAUSE("Error converting last "
- "inactivity message timestamp to nanoseconds");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ bt2::StreamClass streamClass = (*stream_iter->stream)->cls();
+ nonstd::optional<bt2::ClockClass> clockClass = streamClass.defaultClockClass();
+ int64_t last_inactivity_ts_ns =
+ clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value);
if (last_inactivity_ts_ns <= late_msg_ts_ns) {
BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
"have a late message that is none included in a stream "
*/
BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
"msg-new-ts-ns=%" PRIu64,
- bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
- stream_iter->last_inactivity_ts.value);
+ bt2_common::messageTypeStr(late_msg->type()), stream_iter->last_inactivity_ts.value);
+ nonstd::optional<bt2::Message::Shared> adjustedMessage;
+ lttng_live_iterator_status adjust_status;
switch (late_msg->type()) {
case bt2::MessageType::DISCARDED_EVENTS:
adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
- late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, **stream_iter->stream,
+ late_msg->asDiscardedEvents(), adjustedMessage, stream_iter->last_inactivity_ts.value);
break;
case bt2::MessageType::DISCARDED_PACKETS:
adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
- late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
+ lttng_live_msg_iter->self_msg_iter, **stream_iter->stream,
+ late_msg->asDiscardedPackets(), adjustedMessage, stream_iter->last_inactivity_ts.value);
break;
default:
bt_common_abort();
return adjust_status;
}
- BT_ASSERT_DBG(adjusted_message);
- stream_iter->current_msg = adjusted_message;
+ BT_ASSERT_DBG(adjustedMessage);
+ stream_iter->current_msg = std::move(adjustedMessage);
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
return LTTNG_LIVE_ITERATOR_STATUS_OK;
BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
"stream-name=\"%s\", viewer-stream-id=%" PRIu64,
- bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
- stream_iter->name.c_str(), stream_iter->viewer_stream_id);
+ bt2_common::messageTypeStr((*msg)->type()), stream_iter->name.c_str(),
+ stream_iter->viewer_stream_id);
/*
* Get the timestamp in nanoseconds from origin of this
* messsage.
*/
- live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
+ live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, **msg,
lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
/*
#include <babeltrace2/babeltrace.h>
#include "common/macros.h"
-#include "cpp-common/bt2/message.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/metadata/tsdl/ctf-1-metadata-stream-parser.hpp"
+#include "../common/src/msg-iter.hpp"
+#include "../common/src/metadata/metadata-stream-parser-utils.hpp"
#include "viewer-connection.hpp"
struct lttng_live_component;
* Since only a single iterator per viewer connection, we have
* only a single message iterator per stream.
*/
- ctf_msg_iter_up msg_iter;
+ nonstd::optional<ctf::src::MsgIter> msg_iter;
uint64_t viewer_stream_id = 0;
uint64_t value = 0;
} ctf_stream_class_id;
- /* base offset in current index. */
- uint64_t base_offset = 0;
- /* len to read in current index. */
- uint64_t len = 0;
- /* offset in current index. */
- uint64_t offset = 0;
-
/*
* Clock Snapshot value of the last message iterator inactivity message
* sent downstream.
/* Timestamp in nanoseconds of the current message (current_msg). */
int64_t current_msg_ts_ns = 0;
- std::vector<uint8_t> buf;
-
std::string name;
bool has_stream_hung_up = false;
+
+ struct CurPktInfo
+ {
+ bt2_common::DataLen offsetInRelay;
+ bt2_common::DataLen len;
+ };
+
+ nonstd::optional<CurPktInfo> curPktInfo;
};
struct lttng_live_metadata
{
using UP = std::unique_ptr<lttng_live_metadata>;
- explicit lttng_live_metadata(const bt2_common::LogCfg& logCfgParam) noexcept :
- logCfg {logCfgParam}
+ explicit lttng_live_metadata(bt_self_component *selfComp,
+ const bt2_common::LogCfg& logCfgParam) noexcept :
+ logCfg {logCfgParam},
+ _mSelfComp {selfComp}
+
+ {
+ }
+
+ const ctf::src::TraceCls *traceCls() const
{
+ return _mMetadataStreamParser->traceCls();
+ }
+
+ const nonstd::optional<bt2_common::Uuid>& metadataStreamUuid() const noexcept
+ {
+ return _mMetadataStreamParser->metadataStreamUuid();
+ }
+
+ void parseSection(const uint8_t *begin, const uint8_t *end)
+ {
+ if (!_mMetadataStreamParser) {
+ _mMetadataStreamParser =
+ ctf::src::createMetadataStreamParser(begin, {}, _mSelfComp, logCfg);
+ }
+
+ _mMetadataStreamParser->parseSection(begin, end);
}
const bt2_common::LogCfg logCfg;
uint64_t stream_id = 0;
- /* Weak reference. */
- ctf_metadata_decoder_up decoder;
+private:
+ bt_self_component *_mSelfComp;
+ ctf::src::MetadataStreamParser::UP _mMetadataStreamParser;
};
enum lttng_live_metadata_stream_state
/* Owned by this. */
nonstd::optional<bt2::Trace::Shared> trace;
- nonstd::optional<bt2::TraceClass::Shared> trace_class;
-
lttng_live_metadata::UP metadata;
- const bt_clock_class *clock_class = nullptr;
+ nonstd::optional<bt2::ConstClockClass> clock_class;
std::vector<lttng_live_stream_iterator::UP> stream_iterators;
* written to the file.
*/
enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf);
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf);
enum lttng_live_iterator_status
lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream, struct packet_index *index);
-enum ctf_msg_iter_medium_status
-lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream, uint8_t *buf,
- uint64_t offset, uint64_t req_len, uint64_t *recv_len);
-
bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
BT_HIDDEN
void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
enum lttng_live_stream_state new_state);
+BT_HIDDEN
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+ uint64_t ctfStreamClsId);
+
#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */
#include "cpp-common/libc-up.hpp"
#include "metadata.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
#include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp"
#include "cpp-common/cfg-logging.hpp"
#include "cpp-common/cfg-logging-error-reporting.hpp"
uint8_t minor;
} __attribute__((__packed__));
-static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
+static bool stream_classes_all_have_default_clock_class(bt2::ConstTraceClass tc,
const bt2_common::LogCfg& logCfg)
{
- uint64_t i, sc_count;
- const bt_clock_class *cc = NULL;
- const bt_stream_class *sc;
+ for (std::uint64_t i = 0; i < tc.size(); ++i) {
+ bt2::ConstStreamClass sc = tc[i];
+ nonstd::optional<bt2::ConstClockClass> cc = sc.defaultClockClass();
- sc_count = bt_trace_class_get_stream_class_count(tc);
- for (i = 0; i < sc_count; i++) {
- sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-
- BT_ASSERT(sc);
-
- cc = bt_stream_class_borrow_default_clock_class_const(sc);
if (!cc) {
BT_CLOGE_APPEND_CAUSE("Stream class doesn't have a default clock class: "
"sc-id=%" PRIu64 ", sc-name=\"%s\"",
- bt_stream_class_get_id(sc), bt_stream_class_get_name(sc));
+ sc.id(), sc.name()->c_str());
return false;
}
}
* encountered. This is useful to create message iterator inactivity message as
* we don't need a particular clock class.
*/
-static const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc)
+static bt2::ConstClockClass borrow_any_clock_class(bt2::ConstTraceClass tc)
{
- uint64_t i, sc_count;
- const bt_clock_class *cc = NULL;
- const bt_stream_class *sc;
-
- sc_count = bt_trace_class_get_stream_class_count(tc);
- for (i = 0; i < sc_count; i++) {
- sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
- BT_ASSERT_DBG(sc);
-
- cc = bt_stream_class_borrow_default_clock_class_const(sc);
- if (cc) {
- return cc;
- }
- }
-
- bt_common_abort();
+ return *tc[0].defaultClockClass();
}
BT_HIDDEN
{
struct lttng_live_session *session = trace->session;
struct lttng_live_metadata *metadata = trace->metadata.get();
- std::vector<char> metadataBuf;
bool keep_receiving;
- bt2_common::FileUP fp;
- enum ctf_metadata_decoder_status decoder_status;
const bt2_common::LogCfg& logCfg = trace->logCfg;
enum lttng_live_get_one_metadata_status metadata_status;
keep_receiving = true;
/* Grab all available metadata. */
+ std::vector<uint8_t> metadataBuf;
while (keep_receiving) {
/*
* lttng_live_get_one_metadata_packet() asks the Relay Daemon
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
- /*
- * Open a new reading file handle on the `metadata_buf` and pass it to
- * the metadata decoder.
- */
- fp.reset(bt_fmemopen(metadataBuf.data(), metadataBuf.size(), "rb"));
- if (!fp) {
- if (errno == EINTR && lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
- session->lttng_live_msg_iter->was_interrupted = true;
- return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
- } else {
- BT_CLOGE_ERRNO_APPEND_CAUSE("Cannot memory-open metadata buffer", ".");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
- }
-
/*
* The call to ctf_metadata_decoder_append_content() will append
* new metadata to our current trace class.
*/
BT_CLOGD("Appending new metadata to the ctf_trace class");
- decoder_status = ctf_metadata_decoder_append_content(metadata->decoder.get(), fp.get());
- switch (decoder_status) {
- case CTF_METADATA_DECODER_STATUS_OK:
- if (!trace->trace_class) {
- struct ctf_trace_class *tc =
- ctf_metadata_decoder_borrow_ctf_trace_class(metadata->decoder.get());
+ metadata->parseSection(metadataBuf.data(), metadataBuf.data() + metadataBuf.size());
+ if (!trace->trace) {
+ const ctf::src::TraceCls *ctfTraceCls = metadata->traceCls();
+ BT_ASSERT(ctfTraceCls);
+ nonstd::optional<bt2::TraceClass> irTraceCls = ctfTraceCls->libCls();
- trace->trace_class = ctf_metadata_decoder_get_ir_trace_class(metadata->decoder.get());
- trace->trace = (*trace->trace_class)->instantiate();
- if (!trace->trace) {
- BT_CLOGE_APPEND_CAUSE("Failed to create bt_trace");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ if (irTraceCls) {
+ trace->trace = irTraceCls->instantiate();
- ctf_trace_class_configure_ir_trace(tc, **trace->trace);
+ ctf_trace_class_configure_ir_trace(*ctfTraceCls, **trace->trace, logCfg);
- if (!stream_classes_all_have_default_clock_class((*trace->trace_class)->libObjPtr(),
- logCfg)) {
+ if (!stream_classes_all_have_default_clock_class((*trace->trace)->cls(), logCfg)) {
/* Error logged in function. */
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- trace->clock_class = borrow_any_clock_class((*trace->trace_class)->libObjPtr());
+
+ trace->clock_class = borrow_any_clock_class((*trace->trace)->cls());
}
+ }
- /* The metadata was updated succesfully. */
- trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
+ /* The metadata was updated succesfully. */
+ trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
- default:
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
BT_HIDDEN
{
const bt2_common::LogCfg& logCfg = session->logCfg;
struct lttng_live_trace *trace;
+ lttng_live_metadata::UP metadata =
+ bt2_common::makeUnique<lttng_live_metadata>(session->self_comp, logCfg);
- ctf_metadata_decoder_config cfg(logCfg);
- cfg.self_comp = session->self_comp;
- cfg.create_trace_class = true;
-
- lttng_live_metadata::UP metadata = bt2_common::makeUnique<lttng_live_metadata>(logCfg);
metadata->stream_id = stream_id;
- metadata->decoder = ctf_metadata_decoder_create(&cfg);
- if (!metadata->decoder) {
- BT_CLOGE_APPEND_CAUSE("Failed to create CTF metadata decoder");
- return -1;
- }
trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
if (!trace) {
BT_CLOGE_APPEND_CAUSE("Failed to borrow trace");
#include "compat/endian.h"
#include "compat/compiler.h"
#include "common/common.h"
+#include "cpp-common/exc.hpp"
#include <babeltrace2/babeltrace.h>
#include "cpp-common/make-unique.hpp"
bt_common_abort();
}
-static inline enum ctf_msg_iter_medium_status
-viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
+static inline enum lttng_live_get_stream_bytes_status
+viewer_status_to_lttng_live_get_stream_bytes_status(enum lttng_live_viewer_status viewer_status)
{
switch (viewer_status) {
case LTTNG_LIVE_VIEWER_STATUS_OK:
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
case LTTNG_LIVE_VIEWER_STATUS_ERROR:
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
bt_common_abort();
BT_HIDDEN
enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf)
{
uint64_t len = 0;
enum lttng_live_viewer_status viewer_status;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_metadata rq;
struct lttng_viewer_metadata_packet rp;
- std::vector<char> data;
struct lttng_live_session *session = trace->session;
struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata.get();
return LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
}
- data.resize(len);
+ std::vector<char> localBuf(len);
- viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
+ viewer_status = lttng_live_recv(viewer_connection, localBuf.data(), len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(logCfg, viewer_status, "get metadata packet");
return (lttng_live_get_one_metadata_status) viewer_status;
}
- /*
- * Write the metadata to the file handle.
- */
- buf.insert(buf.end(), data.begin(), data.end());
+ buf.insert(buf.end(), localBuf.begin(), localBuf.end());
return LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
}
}
BT_HIDDEN
-enum ctf_msg_iter_medium_status
+lttng_live_get_stream_bytes_status
lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream, uint8_t *buf,
uint64_t offset, uint64_t req_len, uint64_t *recv_len)
viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_send_status(logCfg, viewer_status, "get data packet command");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(logCfg, viewer_status, "get data packet reply");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
flags = be32toh(rp.flags);
break;
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
BT_CLOGD("Marking trace as needing new metadata: "
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
BT_CLOGD("Reply with any one flags set means we should retry: response=%s",
lttng_viewer_get_packet_return_code_string(rp_status));
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
}
BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: error");
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
case LTTNG_VIEWER_GET_PACKET_EOF:
- return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF;
default:
BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: unknown (%d)", rp_status);
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
if (req_len == 0) {
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(logCfg, viewer_status, "get data packet");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
*recv_len = req_len;
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
}
/*
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
+#include <memory>
#include <string>
#include <glib.h>
bt2::Value::Shared
live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection);
+enum lttng_live_get_stream_bytes_status
+{
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK = __BT_FUNC_STATUS_OK,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF = __BT_FUNC_STATUS_END,
+};
+
+BT_HIDDEN
+lttng_live_get_stream_bytes_status
+lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf,
+ uint64_t offset, uint64_t req_len, uint64_t *recv_len);
+
#endif /* LTTNG_LIVE_VIEWER_CONNECTION_H */
Packet context field class: Structure (1 member):
cpu_id: Unsigned integer (32-bit, Base 10)
Event class `sample_component:message` (ID 0):
+ User attributes:
+ babeltrace.org,2020:
+ log-level: warning
Log level: Warning
Payload field class: Structure (1 member):
message: String
Packet context field class: Structure (1 member):
cpu_id: Unsigned integer (32-bit, Base 10)
Event class `sample_component:message` (ID 0):
+ User attributes:
+ babeltrace.org,2020:
+ log-level: warning
Log level: Warning
Payload field class: Structure (1 member):
message: String
Packet context field class: Structure (1 member):
cpu_id: Unsigned integer (32-bit, Base 10)
Event class `my_app:signe_de_pia$$e` (ID 0):
+ User attributes:
+ babeltrace.org,2020:
+ log-level: debug:line
Log level: Debug (line)
Payload field class: Structure (0 members)
Event class `my_app:signe_de_pia$$e_2` (ID 1):
+ User attributes:
+ babeltrace.org,2020:
+ log-level: debug:line
Log level: Debug (line)
Payload field class: Structure (0 members)
bt_cli "$expected_stdout" "$expected_stderr" "${trace_dir}/1/succeed/multi-domains" -c sink.text.details --params "with-trace-name=false,with-stream-name=false"
bt_remove_cr "${expected_stdout}"
bt_remove_cr "${expected_stderr}"
-
- # Hack. To be removed when src.ctf.lttng-live is updated to use the new
- # IR generator.
- "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}"
- "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}"
- "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}"
-
run_test "$test_text" "$cli_args_template" "$server_args" "$expected_stdout" "$expected_stderr"
diag "Inverse session order from lttng-relayd"
run_test "$test_text" "$cli_args_template" "$server_args_inverse" "$expected_stdout" "$expected_stderr"