{
}
+ ObjT *operator->() noexcept
+ {
+ return &_mObj;
+ }
+
const ObjT *operator->() const noexcept
{
return &_mObj;
#define BABELTRACE_PLUGINS_CTF_COMMON_SRC_ITEM_SEQ_MEDIUM_HPP
#include <cstdint>
-#include <cstdlib>
#include <memory>
-#include <stdexcept>
#include "cpp-common/bt2c/data-len.hpp"
*/
explicit Buf(const std::uint8_t *addr, bt2c::DataLen size) noexcept;
- /* Default copy operations */
- Buf(const Buf&) noexcept = default;
- Buf& operator=(const Buf&) noexcept = default;
-
/*
* Address of this buffer.
*
#include <sstream>
#include <glib.h>
-#include <stdio.h>
-#include <stdlib.h>
#include <babeltrace2/babeltrace.h>
#include "common/assert.h"
#include "compat/mman.h" /* IWYU pragma: keep */
+#include "cpp-common/bt2/wrap.hpp"
#include "cpp-common/bt2s/make-unique.hpp"
#include "cpp-common/vendor/fmt/format.h"
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/pkt-props.hpp"
#include "data-stream.hpp"
#define STREAM_NAME_PREFIX "stream-"
-static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
- size_t *buffer_sz, void *data)
+using namespace bt2c::literals::datalen;
+
+namespace ctf {
+namespace src {
+namespace live {
+
+Buf CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream, bt2c::DataLen minSize)
{
- lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data;
- struct lttng_live_trace *trace = stream->trace;
- struct lttng_live_session *session = trace->session;
- struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
- uint64_t recv_len = 0;
- uint64_t len_left;
- uint64_t read_len;
-
- BT_ASSERT(request_sz);
-
- if (stream->has_stream_hung_up) {
- return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
- }
+ BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}",
+ _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1,
+ requestedOffsetInStream.bytes(), minSize.bytes());
- len_left = stream->base_offset + stream->len - stream->offset;
- if (!len_left) {
- lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
- }
+ if (_mLiveStreamIter.has_stream_hung_up)
+ throw NoData {};
- read_len = MIN(request_sz, stream->buf.size());
- read_len = MIN(read_len, len_left);
+ BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
+ auto requestedOffsetInPacket = requestedOffsetInStream - _mCurPktBegOffsetInStream;
- const auto status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf.data(),
- stream->offset, read_len, &recv_len);
+ BT_ASSERT(_mLiveStreamIter.curPktInfo);
- if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
- return status;
+ if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
+ _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
+ _mLiveStreamIter.curPktInfo.reset();
+ lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+ throw bt2c::TryAgain {};
}
- *buffer_addr = stream->buf.data();
- *buffer_sz = recv_len;
- stream->offset += recv_len;
+ auto requestedOffsetInRelay =
+ _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
+ auto lenUntilEndOfPacket = _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
+ auto maxReqLen = bt2c::DataLen::fromBytes(
+ _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
+ auto reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
+ uint64_t recvLen;
-static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data)
-{
- lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data;
-
- if (!lttng_live_stream->stream) {
- uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
-
- BT_CPPLOGI_SPEC(lttng_live_stream->logger,
- "Creating stream {} (ID: {}) out of stream class {}",
- lttng_live_stream->name, stream_id, stream_class_id);
-
- bt_stream *stream;
-
- if (stream_id < 0) {
- /*
- * No stream instance ID in the stream. It's possible
- * to encounter this situation with older version of
- * LTTng. In these cases, use the viewer_stream_id that
- * is unique for a live viewer session.
- */
- stream =
- bt_stream_create_with_id(stream_class, lttng_live_stream->trace->trace->libObjPtr(),
- lttng_live_stream->viewer_stream_id);
- } else {
- stream = bt_stream_create_with_id(
- stream_class, lttng_live_stream->trace->trace->libObjPtr(), (uint64_t) stream_id);
- }
+ _mBuf.resize(reqLen.bytes());
- if (!stream) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(
- lttng_live_stream->logger,
- "Cannot create stream {} (stream class ID {}, stream ID {})",
- lttng_live_stream->name, stream_class_id, stream_id);
- return nullptr;
- }
+ lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
+ _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
+ requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
+ switch (status) {
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
+ _mBuf.resize(recvLen);
+ break;
+
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
+ BT_CPPLOGD("CtfLiveMedium::buf try again");
+ throw bt2c::TryAgain();
- lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream);
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
+ BT_CPPLOGD("CtfLiveMedium::buf eof");
+ throw NoData();
- lttng_live_stream->stream->name(lttng_live_stream->name);
+ case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
+ BT_CPPLOGD("CtfLiveMedium::buf error");
+ throw bt2c::Error();
}
- return lttng_live_stream->stream->libObjPtr();
+ const Buf buf {_mBuf.data(), bt2c::DataLen::fromBytes(_mBuf.size())};
+
+ BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}",
+ _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, fmt::ptr(buf.addr()),
+ buf.size().bytes());
+
+ return buf;
}
-static struct ctf_msg_iter_medium_ops medops = {
- medop_request_bytes,
- nullptr,
- nullptr,
- medop_borrow_stream,
-};
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
+{
+ BT_ASSERT(!liveStreamIter->msg_iter);
+ BT_ASSERT(!liveStreamIter->stream);
+ lttng_live_trace *trace = liveStreamIter->trace;
+ lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
+
+ auto tempMedium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+ const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
+ BT_ASSERT(ctfTc);
+ ctf::src::PktProps pktProps =
+ ctf::src::readPktProps(*ctfTc, std::move(tempMedium), 0_bytes, liveStreamIter->logger);
+
+ bt2::OptionalBorrowedObject<bt2::TraceClass> tc = ctfTc->libCls();
+ BT_ASSERT(tc);
+ BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
+ BT_ASSERT(trace->trace);
+
+ auto sc = tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
+ if (!sc) {
+ BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter->logger, bt2::Error,
+ "No stream class with id {}",
+ liveStreamIter->ctf_stream_class_id.value);
+ }
+
+ bt_stream *streamPtr;
+ if (pktProps.dataStreamId) {
+ streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
+ *pktProps.dataStreamId);
+ } else {
+ /*
+ * No stream instance ID in the stream. It's possible
+ * to encounter this situation with older version of
+ * LTTng. In these cases, use the viewer_stream_id that
+ * is unique for a live viewer session.
+ */
+ streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
+ liveStreamIter->viewer_stream_id);
+ }
+ BT_ASSERT(streamPtr);
+ liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
+ liveStreamIter->stream->name(liveStreamIter->name);
+
+ auto medium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+ liveStreamIter->msg_iter.emplace(bt2::wrap(liveMsgIter->self_msg_iter), *ctfTc,
+ liveStreamIter->trace->metadata->metadataStreamUuid(),
+ *liveStreamIter->stream, std::move(medium),
+ ctf::src::MsgIterQuirks {}, liveStreamIter->logger);
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+}
enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
bt_self_message_iterator *self_msg_iter)
{
- struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
-
if (!session->lazy_stream_msg_init) {
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
for (lttng_live_trace::UP& trace : session->traces) {
for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
- struct ctf_trace_class *ctf_tc;
-
if (stream_iter->msg_iter) {
continue;
}
- ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
- BT_CPPLOGD_SPEC(stream_iter->logger,
+ const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
+ BT_CPPLOGD_SPEC(session->logger,
"Creating CTF message iterator: session-id={}, ctf-tc-addr={}, "
"stream-iter-name={}, self-msg-iter-addr={}",
- session->id, fmt::ptr(ctf_tc), stream_iter->name,
+ session->id, fmt::ptr(ctfTraceCls), stream_iter->name.c_str(),
fmt::ptr(self_msg_iter));
- stream_iter->msg_iter =
- ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(),
- self_msg_iter, stream_iter->logger);
- if (!stream_iter->msg_iter) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger,
- "Failed to create CTF message iterator");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
}
}
struct lttng_live_stream_iterator *
lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
- uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
+ uint64_t stream_id)
{
std::stringstream nameSs;
BT_ASSERT(session->lttng_live_msg_iter);
BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
- lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
- lttng_live_trace *trace =
- lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
+ const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
if (!trace) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow CTF trace.");
return nullptr;
stream_iter->last_inactivity_ts.is_set = false;
stream_iter->last_inactivity_ts.value = 0;
- if (trace->trace) {
- struct ctf_trace_class *ctf_tc =
- ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
- BT_ASSERT(!stream_iter->msg_iter);
- stream_iter->msg_iter =
- ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(),
- self_msg_iter, stream_iter->logger);
- if (!stream_iter->msg_iter) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger,
- "Failed to create CTF message iterator");
- return nullptr;
- }
- }
- stream_iter->buf.resize(lttng_live->max_query_size);
-
nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
stream_iter->name = nameSs.str();
return ret;
}
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+ uint64_t ctfStreamClsId)
+{
+ if (streamIter->ctf_stream_class_id.is_set) {
+ BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
+ return;
+ } else {
+ streamIter->ctf_stream_class_id.value = ctfStreamClsId;
+ streamIter->ctf_stream_class_id.is_set = true;
+ }
+}
+
lttng_live_stream_iterator::~lttng_live_stream_iterator()
{
/* Track the number of active stream iterator. */
struct lttng_live_stream_iterator *
lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
- uint64_t stream_id, bt_self_message_iterator *self_msg_iter);
+ uint64_t stream_id);
+
+namespace ctf {
+namespace src {
+namespace live {
+
+struct CtfLiveMedium : Medium
+{
+ CtfLiveMedium(lttng_live_stream_iterator& liveStreamIter) :
+ _mLogger {liveStreamIter.logger, "PLUGIN/SRC.CTF.LTTNG-LIVE/CTF-LIVE-MEDIUM"},
+ _mLiveStreamIter(liveStreamIter)
+ {
+ }
+
+ Buf buf(bt2c::DataLen offset, bt2c::DataLen minSize) override;
+
+private:
+ bt2c::Logger _mLogger;
+ lttng_live_stream_iterator& _mLiveStreamIter;
+
+ bt2c::DataLen _mCurPktBegOffsetInStream = bt2c::DataLen::fromBits(0);
+ std::vector<uint8_t> _mBuf;
+};
+
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter);
#endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_DATA_STREAM_HPP */
#include <unistd.h>
#include "common/assert.h"
+#include "cpp-common/bt2/wrap.hpp"
#include "cpp-common/bt2c/fmt.hpp"
#include "cpp-common/bt2c/glib-up.hpp"
#include "cpp-common/bt2c/vector.hpp"
"session-id={}, hostname=\"{}\", session-name=\"{}\"",
session_id, hostname, session_name);
- auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
+ auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger,
+ lttng_live_msg_iter->selfComp);
- session->self_comp = lttng_live_msg_iter->self_comp;
session->id = session_id;
session->lttng_live_msg_iter = lttng_live_msg_iter;
session->new_streams_needed = true;
goto end;
}
- lttng_live_stream->base_offset = index.offset;
- lttng_live_stream->offset = index.offset;
- lttng_live_stream->len = index.packet_size / CHAR_BIT;
+ lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo {
+ bt2c::DataLen::fromBytes(index.offset),
+ bt2c::DataLen::fromBits(index.packet_size),
+ });
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Setting live stream reading info: stream-name=\"{}\", "
- "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
+ "viewer-stream-id={}, stream-offset-in-relay={}, stream-len-bytes={}",
lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
- lttng_live_stream->base_offset, lttng_live_stream->offset,
- lttng_live_stream->len);
+ lttng_live_stream->curPktInfo->offsetInRelay.bytes(),
+ lttng_live_stream->curPktInfo->len.bytes());
end:
if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
if (!session->attached) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
session->id);
- enum lttng_live_viewer_status attach_status =
- lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
+ lttng_live_viewer_status attach_status = lttng_live_session_attach(session);
if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
/*
"Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
session->session_name);
- status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
+ status = lttng_live_session_get_new_streams(session);
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
status = lttng_live_metadata_update(trace.get());
switch (status) {
case LTTNG_LIVE_ITERATOR_STATUS_END:
+ break;
case LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
timestamp);
const auto msg = bt_message_message_iterator_inactivity_create(
- lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+ lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class->libObjPtr(),
+ timestamp);
if (!msg) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
}
static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
- const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
+ bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
{
- const bt_clock_snapshot *clock_snapshot = NULL;
- int ret = 0;
+ bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> clockSnapshot;
- BT_ASSERT_DBG(msg);
BT_ASSERT_DBG(ts_ns);
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
- "last-msg-ts={}",
- fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
+ "Getting message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}",
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns);
- switch (bt_message_get_type(msg)) {
- case BT_MESSAGE_TYPE_EVENT:
- clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
+ switch (msg.type()) {
+ case bt2::MessageType::Event:
+ clockSnapshot = msg.asEvent().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PacketBeginning:
+
+ clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_PACKET_END:
- clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
+ case bt2::MessageType::PacketEnd:
+ clockSnapshot = msg.asPacketEnd().defaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- clock_snapshot =
- bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DiscardedEvents:
+ clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_snapshot =
- bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
+ case bt2::MessageType::DiscardedPackets:
+ clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot();
break;
- case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
- clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
+ case bt2::MessageType::MessageIteratorInactivity:
+ clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot();
break;
default:
/* All the other messages have a higher priority */
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Message has no timestamp, using the last message timestamp: "
"iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
- fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns,
+ *ts_ns);
*ts_ns = last_msg_ts_ns;
return 0;
}
- ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
- if (ret) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Cannot get nanoseconds from Epoch of clock snapshot: "
- "clock-snapshot-addr={}",
- fmt::ptr(clock_snapshot));
- return -1;
- }
+ *ts_ns = clockSnapshot->nsFromOrigin();
BT_CPPLOGD_SPEC(
lttng_live_msg_iter->logger,
"Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
- fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
+ fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns, *ts_ns);
return 0;
}
struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
{
- enum ctf_msg_iter_status status;
-
for (const auto& session : lttng_live_msg_iter->sessions) {
if (session->new_streams_needed) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for streams: session-id={}", session->id);
return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
- for (lttng_live_trace::UP& trace : session->traces) {
+
+ for (const auto& trace : session->traces) {
if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
"Need an update for metadata stream: session-id={}, trace-id={}",
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- const bt_message *msg;
- status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
- switch (status) {
- case CTF_MSG_ITER_STATUS_EOF:
- return LTTNG_LIVE_ITERATOR_STATUS_END;
- case CTF_MSG_ITER_STATUS_OK:
- message = bt2::ConstMessage::Shared::createWithoutRef(msg);
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
- case CTF_MSG_ITER_STATUS_AGAIN:
- /*
- * Continue immediately (end of packet). The next
- * get_index may return AGAIN to delay the following
- * attempt.
- */
- return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
- case CTF_MSG_ITER_STATUS_ERROR:
- default:
- BT_CPPLOGE_APPEND_CAUSE_SPEC(
- lttng_live_msg_iter->logger,
- "CTF message iterator failed to get next message: msg-iter={}, msg-iter-status={}",
- fmt::ptr(lttng_live_stream->msg_iter), status);
+ if (!lttng_live_stream->msg_iter) {
+ /* The first time we're called for this stream, the MsgIter is not instantiated. */
+ enum lttng_live_iterator_status ret =
+ lttng_live_stream_iterator_create_msg_iter(lttng_live_stream);
+ if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ return ret;
+ }
+ }
+
+ try {
+ message = lttng_live_stream->msg_iter->next();
+ if (message) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } else {
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ } catch (const bt2c::TryAgain&) {
+ return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } catch (const bt2::Error&) {
+ BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
+ "CTF message iterator failed to get next message: msg-iter={}",
+ fmt::ptr(&*lttng_live_stream->msg_iter));
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
}
* `ctf_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
- const bt_message *msg;
- enum ctf_msg_iter_status status =
- ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
+ try {
+ if (!stream_iter->msg_iter) {
+ BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+ "Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+
+ curr_msg = stream_iter->msg_iter->next();
+ if (!curr_msg) {
+ BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+ "Reached the end of the live stream iterator.");
+ return LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
- if (status == CTF_MSG_ITER_STATUS_ERROR) {
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
+ } catch (const bt2::Error&) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Error getting the next message from CTF message iterator");
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- } else if (status == CTF_MSG_ITER_STATUS_EOF) {
- BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
- "Reached the end of the live stream iterator.");
- return LTTNG_LIVE_ITERATOR_STATUS_END;
}
-
- BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
-
- curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
-
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
/*
}
static enum lttng_live_iterator_status
-adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
- uint64_t new_begin_ts)
+adjust_discarded_packets_message(bt_self_message_iterator *iter, bt2::Stream stream,
+ bt2::ConstDiscardedPacketsMessage msgIn,
+ bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
{
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_packets_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ BT_ASSERT_DBG(msgIn.count());
const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
if (!msg) {
return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_packets_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_packets_set_count(msg, *msgIn.count());
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
static enum lttng_live_iterator_status
-adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
- const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
- uint64_t new_begin_ts)
+adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream,
+ bt2::ConstDiscardedEventsMessage msgIn,
+ bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
{
- enum bt_property_availability availability;
- const bt_clock_snapshot *clock_snapshot;
- uint64_t end_ts;
- uint64_t count;
-
- clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
- end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
- availability = bt_message_discarded_events_get_count(msg_in, &count);
- BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+ BT_ASSERT_DBG(msgIn.count());
const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
- iter, stream, new_begin_ts, end_ts);
+ iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
if (!msg) {
return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
}
- bt_message_discarded_events_set_count(msg, count);
- msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+ bt_message_discarded_events_set_count(msg, *msgIn.count());
+ msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
const bt2::ConstMessage& late_msg)
{
- const bt_clock_class *clock_class;
- const bt_stream_class *stream_class;
- enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
- int64_t last_inactivity_ts_ns;
- enum lttng_live_iterator_status adjust_status;
- bt2::ConstMessage::Shared adjusted_message;
-
/*
* The timestamp of the current message is before the last message sent
* by this component. We CANNOT send it as is.
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
- clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
-
- ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
- clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
- if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
- "Error converting last "
- "inactivity message timestamp to nanoseconds");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ const auto streamClass = stream_iter->stream->cls();
+ const auto clockClass = streamClass.defaultClockClass();
+ int64_t last_inactivity_ts_ns =
+ clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value);
if (last_inactivity_ts_ns <= late_msg_ts_ns) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
"Invalid live stream state: "
* adjust its timestamp to ensure monotonicity.
*/
BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
- "Adjusting the timestamp of late message: late-msg-type={}, "
- "msg-new-ts-ns={}",
+ "Adjusting the timestamp of late message: late-msg-type={}, msg-new-ts-ns={}",
late_msg.type(), stream_iter->last_inactivity_ts.value);
- switch (late_msg.type()) {
- case bt2::MessageType::DiscardedEvents:
- adjust_status = adjust_discarded_events_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
- late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
- break;
- case bt2::MessageType::DiscardedPackets:
- adjust_status = adjust_discarded_packets_message(
- lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
- late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
- break;
- default:
- bt_common_abort();
- }
+
+ bt2::ConstMessage::Shared adjustedMessage;
+
+ const auto adjust_status = bt2c::call([&] {
+ switch (late_msg.type()) {
+ case bt2::MessageType::DiscardedEvents:
+ return adjust_discarded_events_message(lttng_live_msg_iter->self_msg_iter,
+ *stream_iter->stream,
+ late_msg.asDiscardedEvents(), adjustedMessage,
+ stream_iter->last_inactivity_ts.value);
+
+ case bt2::MessageType::DiscardedPackets:
+ return adjust_discarded_packets_message(lttng_live_msg_iter->self_msg_iter,
+ *stream_iter->stream,
+ late_msg.asDiscardedPackets(), adjustedMessage,
+ stream_iter->last_inactivity_ts.value);
+
+ default:
+ bt_common_abort();
+ }
+ });
if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
return adjust_status;
}
- BT_ASSERT_DBG(adjusted_message);
- stream_iter->current_msg = adjusted_message;
+ BT_ASSERT_DBG(adjustedMessage);
+ stream_iter->current_msg = std::move(adjustedMessage);
stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
return LTTNG_LIVE_ITERATOR_STATUS_OK;
* Get the timestamp in nanoseconds from origin of this
* message.
*/
- live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
- lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
+ live_get_msg_ts_ns(lttng_live_msg_iter, *msg, lttng_live_msg_iter->last_msg_ts_ns,
+ &curr_msg_ts_ns);
/*
* Check if the message of the current live stream
lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
bt_self_message_iterator *self_msg_it)
{
- auto msg_iter = bt2s::make_unique<struct lttng_live_msg_iter>(lttng_live_comp->logger);
+ auto msg_iter = bt2s::make_unique<struct lttng_live_msg_iter>(lttng_live_comp->logger,
+ lttng_live_comp->selfComp);
- msg_iter->self_comp = lttng_live_comp->self_comp;
msg_iter->lttng_live_comp = lttng_live_comp;
msg_iter->self_msg_iter = self_msg_it;
msg_iter->active_stream_iter = 0;
const bt_value *value;
enum bt_param_validation_status validation_status;
gchar *validation_error = NULL;
- bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
+ bt2c::Logger logger {bt2::wrap(self_comp), "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
}
- auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
- lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
+ auto lttng_live =
+ bt2s::make_unique<lttng_live_component>(std::move(logger), bt2::wrap(self_comp));
+
lttng_live->max_query_size = MAX_QUERY_SIZE;
lttng_live->has_msg_iter = false;
#include "cpp-common/bt2/message.hpp"
#include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */
-#include "../common/src/metadata/tsdl/decoder.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/metadata/metadata-stream-parser-utils.hpp"
+#include "../common/src/msg-iter.hpp"
#include "viewer-connection.hpp"
/*
* Since only a single iterator per viewer connection, we have
* only a single message iterator per stream.
*/
- ctf_msg_iter_up msg_iter;
+ bt2s::optional<ctf::src::MsgIter> msg_iter;
uint64_t viewer_stream_id = 0;
uint64_t value = 0;
} ctf_stream_class_id;
- /* base offset in current index. */
- uint64_t base_offset = 0;
- /* len to read in current index. */
- uint64_t len = 0;
- /* offset in current index. */
- uint64_t offset = 0;
-
/*
* Clock Snapshot value of the last message iterator inactivity message
* sent downstream.
/* Timestamp in nanoseconds of the current message (current_msg). */
int64_t current_msg_ts_ns = 0;
- std::vector<uint8_t> buf;
-
std::string name;
bool has_stream_hung_up = false;
+
+ struct CurPktInfo
+ {
+ bt2c::DataLen offsetInRelay;
+ bt2c::DataLen len;
+ };
+
+ bt2s::optional<CurPktInfo> curPktInfo;
};
struct lttng_live_metadata
{
using UP = std::unique_ptr<lttng_live_metadata>;
- explicit lttng_live_metadata(const bt2c::Logger& parentLogger) :
- logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"}
+ explicit lttng_live_metadata(const bt2::SelfComponent selfComp,
+ const bt2c::Logger& parentLogger) :
+ logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"},
+ _mSelfComp {selfComp}
+
{
}
+ const ctf::src::TraceCls *traceCls() const
+ {
+ return _mMetadataStreamParser->traceCls();
+ }
+
+ const bt2s::optional<bt2c::Uuid>& metadataStreamUuid() const noexcept
+ {
+ return _mMetadataStreamParser->metadataStreamUuid();
+ }
+
+ void parseSection(const bt2c::ConstBytes data)
+ {
+ if (!_mMetadataStreamParser) {
+ _mMetadataStreamParser =
+ ctf::src::createMetadataStreamParser(data, _mSelfComp, {}, logger);
+ }
+
+ _mMetadataStreamParser->parseSection(data);
+ }
+
+ bt2::SelfComponent selfComp() const noexcept
+ {
+ return _mSelfComp;
+ }
+
bt2c::Logger logger;
uint64_t stream_id = 0;
- /* Weak reference. */
- ctf_metadata_decoder_up decoder;
+private:
+ bt2::SelfComponent _mSelfComp;
+ ctf::src::MetadataStreamParser::UP _mMetadataStreamParser;
};
enum lttng_live_metadata_stream_state
bt2::Trace::Shared trace;
- bt2::TraceClass::Shared trace_class;
-
lttng_live_metadata::UP metadata;
- const bt_clock_class *clock_class = nullptr;
+ bt2::OptionalBorrowedObject<bt2::ConstClockClass> clock_class;
std::vector<lttng_live_stream_iterator::UP> stream_iterators;
{
using UP = std::unique_ptr<lttng_live_session>;
- explicit lttng_live_session(const bt2c::Logger& parentLogger) :
- logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"}
+ explicit lttng_live_session(const bt2c::Logger& parentLogger,
+ const bt2::SelfComponent selfCompParam) :
+ logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"},
+ selfComp {selfCompParam}
{
}
bt2c::Logger logger;
- bt_self_component *self_comp = nullptr;
+ bt2::SelfComponent selfComp;
/* Weak reference. */
struct lttng_live_msg_iter *lttng_live_msg_iter = nullptr;
{
using UP = std::unique_ptr<lttng_live_component>;
- explicit lttng_live_component(bt2c::Logger loggerParam) noexcept :
- logger {std::move(loggerParam)}
+ explicit lttng_live_component(bt2c::Logger loggerParam,
+ const bt2::SelfComponent selfCompParam) noexcept :
+ logger {std::move(loggerParam)},
+ selfComp {selfCompParam}
{
}
bt2c::Logger logger;
- /* Weak reference. */
- bt_self_component *self_comp = nullptr;
+ bt2::SelfComponent selfComp;
struct
{
{
using UP = std::unique_ptr<lttng_live_msg_iter>;
- explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger) :
- logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"}
+ explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger,
+ const bt2::SelfComponent selfCompParam) :
+ logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"},
+ selfComp {selfCompParam}
{
}
bt2c::Logger logger;
- bt_self_component *self_comp = nullptr;
+ bt2::SelfComponent selfComp;
/* Weak reference. */
struct lttng_live_component *lttng_live_comp = nullptr;
void lttng_live_msg_iter_finalize(bt_self_message_iterator *it);
-enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
- bt_self_message_iterator *self_msg_iter);
+enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session);
enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session);
enum lttng_live_iterator_status
-lttng_live_session_get_new_streams(struct lttng_live_session *session,
- bt_self_message_iterator *self_msg_iter);
+lttng_live_session_get_new_streams(struct lttng_live_session *session);
struct lttng_live_trace *
lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
* written to the file.
*/
enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf);
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf);
enum lttng_live_iterator_status
lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream, struct packet_index *index);
-enum ctf_msg_iter_medium_status
-lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
- struct lttng_live_stream_iterator *stream, uint8_t *buf,
- uint64_t offset, uint64_t req_len, uint64_t *recv_len);
-
bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
enum lttng_live_stream_state new_state);
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+ uint64_t ctfStreamClsId);
+
#endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_LTTNG_LIVE_HPP */
* Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
*/
-#include "compat/memstream.h"
-#include "cpp-common/bt2c/libc-up.hpp"
#include "cpp-common/bt2s/make-unique.hpp"
#include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
#include "lttng-live.hpp"
#include "metadata.hpp"
uint8_t minor;
} __attribute__((__packed__));
-static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
+static bool stream_classes_all_have_default_clock_class(bt2::ConstTraceClass tc,
const bt2c::Logger& logger)
{
- uint64_t i, sc_count;
- const bt_clock_class *cc = NULL;
- const bt_stream_class *sc;
+ for (std::uint64_t i = 0; i < tc.length(); ++i) {
+ auto sc = tc[i];
+ auto cc = sc.defaultClockClass();
- sc_count = bt_trace_class_get_stream_class_count(tc);
- for (i = 0; i < sc_count; i++) {
- sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-
- BT_ASSERT(sc);
-
- cc = bt_stream_class_borrow_default_clock_class_const(sc);
if (!cc) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(logger,
"Stream class doesn't have a default clock class: "
"sc-id={}, sc-name=\"{}\"",
- bt_stream_class_get_id(sc), bt_stream_class_get_name(sc));
+ sc.id(), sc.name());
return false;
}
}
* encountered. This is useful to create message iterator inactivity message as
* we don't need a particular clock class.
*/
-static const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc)
+static bt2::ConstClockClass borrow_any_clock_class(bt2::ConstTraceClass tc)
{
- uint64_t i, sc_count;
- const bt_clock_class *cc = NULL;
- const bt_stream_class *sc;
-
- sc_count = bt_trace_class_get_stream_class_count(tc);
- for (i = 0; i < sc_count; i++) {
- sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
- BT_ASSERT_DBG(sc);
-
- cc = bt_stream_class_borrow_default_clock_class_const(sc);
- if (cc) {
- return cc;
- }
- }
-
- bt_common_abort();
+ return *tc[0].defaultClockClass();
}
enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_trace *trace)
{
struct lttng_live_session *session = trace->session;
struct lttng_live_metadata *metadata = trace->metadata.get();
- std::vector<char> metadataBuf;
bool keep_receiving;
- bt2c::FileUP fp;
- enum ctf_metadata_decoder_status decoder_status;
enum lttng_live_get_one_metadata_status metadata_status;
BT_CPPLOGD_SPEC(metadata->logger, "Updating metadata for trace: session-id={}, trace-id={}",
keep_receiving = true;
/* Grab all available metadata. */
+ std::vector<uint8_t> metadataBuf;
while (keep_receiving) {
/*
* lttng_live_get_one_metadata_packet() asks the Relay Daemon
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
- /*
- * Open a new reading file handle on the `metadata_buf` and pass it to
- * the metadata decoder.
- */
- fp.reset(bt_fmemopen(metadataBuf.data(), metadataBuf.size(), "rb"));
- if (!fp) {
- if (errno == EINTR && lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
- session->lttng_live_msg_iter->was_interrupted = true;
- return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
- } else {
- BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(metadata->logger,
- "Cannot memory-open metadata buffer", ".");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
- }
-
/*
* The call to ctf_metadata_decoder_append_content() will append
* new metadata to our current trace class.
*/
BT_CPPLOGD_SPEC(metadata->logger, "Appending new metadata to the ctf_trace class");
- decoder_status = ctf_metadata_decoder_append_content(metadata->decoder.get(), fp.get());
- switch (decoder_status) {
- case CTF_METADATA_DECODER_STATUS_OK:
- if (!trace->trace_class) {
- struct ctf_trace_class *tc =
- ctf_metadata_decoder_borrow_ctf_trace_class(metadata->decoder.get());
+ metadata->parseSection(metadataBuf);
+ if (!trace->trace) {
+ const ctf::src::TraceCls *ctfTraceCls = metadata->traceCls();
+ BT_ASSERT(ctfTraceCls);
+ bt2::OptionalBorrowedObject<bt2::TraceClass> irTraceCls = ctfTraceCls->libCls();
- trace->trace_class = ctf_metadata_decoder_get_ir_trace_class(metadata->decoder.get());
- trace->trace = trace->trace_class->instantiate();
- if (!trace->trace) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(metadata->logger, "Failed to create bt_trace");
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ if (irTraceCls) {
+ trace->trace = irTraceCls->instantiate();
- ctf_trace_class_configure_ir_trace(tc, *trace->trace);
+ ctf_trace_class_configure_ir_trace(*ctfTraceCls, *trace->trace,
+ metadata->selfComp().graphMipVersion(),
+ metadata->logger);
- if (!stream_classes_all_have_default_clock_class(trace->trace_class->libObjPtr(),
+ if (!stream_classes_all_have_default_clock_class(trace->trace->cls(),
metadata->logger)) {
/* Error logged in function. */
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- trace->clock_class = borrow_any_clock_class(trace->trace_class->libObjPtr());
+
+ trace->clock_class = borrow_any_clock_class(trace->trace->cls());
}
+ }
- /* The metadata was updated successfully. */
- trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
+ /* The metadata was updated successfully. */
+ trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
- return LTTNG_LIVE_ITERATOR_STATUS_OK;
- default:
- return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- }
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
int lttng_live_metadata_create_stream(struct lttng_live_session *session, uint64_t ctf_trace_id,
uint64_t stream_id)
{
- struct lttng_live_trace *trace;
+ auto metadata = bt2s::make_unique<lttng_live_metadata>(session->selfComp, session->logger);
- ctf_metadata_decoder_config cfg {session->logger};
- cfg.self_comp = session->self_comp;
- cfg.create_trace_class = true;
-
- auto metadata = bt2s::make_unique<lttng_live_metadata>(session->logger);
metadata->stream_id = stream_id;
- metadata->decoder = ctf_metadata_decoder_create(&cfg);
- if (!metadata->decoder) {
- BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to create CTF metadata decoder");
- return -1;
- }
+ const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
- trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
if (!trace) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow trace");
return -1;
bt_common_abort();
}
-static inline enum ctf_msg_iter_medium_status
-viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
+static inline enum lttng_live_get_stream_bytes_status
+viewer_status_to_lttng_live_get_stream_bytes_status(enum lttng_live_viewer_status viewer_status)
{
switch (viewer_status) {
case LTTNG_LIVE_VIEWER_STATUS_OK:
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
case LTTNG_LIVE_VIEWER_STATUS_ERROR:
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
bt_common_abort();
}
static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
- uint32_t stream_count,
- bt_self_message_iterator *self_msg_iter)
+ uint32_t stream_count)
{
uint32_t i;
struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
} else {
BT_CPPLOGI_SPEC(viewer_connection->logger, " stream {} : {}/{}", stream_id,
stream.path_name, stream.channel_name);
- live_stream =
- lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
+ live_stream = lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id);
if (!live_stream) {
BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream");
return LTTNG_LIVE_VIEWER_STATUS_ERROR;
return LTTNG_LIVE_VIEWER_STATUS_OK;
}
-enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
- bt_self_message_iterator *self_msg_iter)
+enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session)
{
struct lttng_viewer_cmd cmd;
enum lttng_live_viewer_status status;
}
/* We receive the initial list of streams. */
- status = receive_streams(session, streams_count, self_msg_iter);
+ status = receive_streams(session, streams_count);
switch (status) {
case LTTNG_LIVE_VIEWER_STATUS_OK:
break;
}
enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf)
{
uint64_t len = 0;
enum lttng_live_viewer_status viewer_status;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_metadata rq;
struct lttng_viewer_metadata_packet rp;
- std::vector<char> data;
struct lttng_live_session *session = trace->session;
struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata.get();
return LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
}
- BT_DIAG_PUSH
- BT_DIAG_IGNORE_NULL_DEREFERENCE
- data.resize(len);
- BT_DIAG_POP
+ std::vector<char> localBuf(len);
- viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
+ viewer_status = lttng_live_recv(viewer_connection, localBuf.data(), len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(viewer_status, "get metadata packet");
return (lttng_live_get_one_metadata_status) viewer_status;
}
- /*
- * Write the metadata to the file handle.
- */
- buf.insert(buf.end(), data.begin(), data.end());
+ buf.insert(buf.end(), localBuf.begin(), localBuf.end());
return LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
}
}
}
-enum ctf_msg_iter_medium_status
+lttng_live_get_stream_bytes_status
lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream, uint8_t *buf,
uint64_t offset, uint64_t req_len, uint64_t *recv_len)
viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_send_status(viewer_status, "get data packet command");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(viewer_status, "get data packet reply");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
flags = be32toh(rp.flags);
break;
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
BT_CPPLOGD_SPEC(viewer_connection->logger,
BT_CPPLOGD_SPEC(viewer_connection->logger,
"Reply with any one flags set means we should retry: response={}",
static_cast<lttng_viewer_get_packet_return_code>(rp_status));
- return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
}
BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
"Received get_data_packet response: error");
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
case LTTNG_VIEWER_GET_PACKET_EOF:
- return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF;
default:
BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
"Received get_data_packet response: unknown ({})", rp_status);
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
if (req_len == 0) {
- return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
}
viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(viewer_status, "get data packet");
- return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+ return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
}
*recv_len = req_len;
- return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+ return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
}
/*
* Request new streams for a session.
*/
enum lttng_live_iterator_status
-lttng_live_session_get_new_streams(struct lttng_live_session *session,
- bt_self_message_iterator *self_msg_iter)
+lttng_live_session_get_new_streams(struct lttng_live_session *session)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- viewer_status = receive_streams(session, streams_count, self_msg_iter);
+ viewer_status = receive_streams(session, streams_count);
if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
viewer_handle_recv_status(viewer_status, "new streams");
return viewer_status_to_live_iterator_status(viewer_status);
#ifndef BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP
#define BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP
+#include <memory>
#include <string>
#include <glib.h>
bt2::Value::Shared
live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection);
+enum lttng_live_get_stream_bytes_status
+{
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK = __BT_FUNC_STATUS_OK,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
+ LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF = __BT_FUNC_STATUS_END,
+};
+
+lttng_live_get_stream_bytes_status
+lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf,
+ uint64_t offset, uint64_t req_len, uint64_t *recv_len);
+
#endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP */
bt_remove_cr "${expected_stdout}"
bt_remove_cr "${expected_stderr}"
- # Hack. To be removed when src.ctf.lttng-live is updated to use the new
- # IR generator.
- "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}"
- "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}"
- "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}"
-
run_test "$test_text" "$cli_args_template" "$expected_stdout" \
"$expected_stderr" "$trace_dir_native" "${server_args[@]}"
+
diag "Inverse session order from lttng-relayd"
run_test "$test_text" "$cli_args_template" "$expected_stdout" \
"$expected_stderr" "$trace_dir_native" "${server_args_inverse[@]}"