From 81c7f2422f4c32f664caca3ed2f83be0d7e08ded Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Fri, 24 May 2024 17:21:56 -0400 Subject: [PATCH] src.ctf.lttng-live: use the new metadata stream parser and message iterator Signed-off-by: Simon Marchi Change-Id: Id2e11205bed54654942077c5336495b7bdd3f38d Reviewed-on: https://review.lttng.org/c/babeltrace/+/8617 Reviewed-by: Philippe Proulx Reviewed-on: https://review.lttng.org/c/babeltrace/+/8648 --- src/cpp-common/bt2/borrowed-object-proxy.hpp | 5 + .../ctf/common/src/item-seq/medium.hpp | 6 - src/plugins/ctf/lttng-live/data-stream.cpp | 237 ++++++++------- src/plugins/ctf/lttng-live/data-stream.hpp | 31 +- src/plugins/ctf/lttng-live/lttng-live.cpp | 285 ++++++++---------- src/plugins/ctf/lttng-live/lttng-live.hpp | 105 ++++--- src/plugins/ctf/lttng-live/metadata.cpp | 111 ++----- .../ctf/lttng-live/viewer-connection.cpp | 63 ++-- .../ctf/lttng-live/viewer-connection.hpp | 14 + tests/plugins/src.ctf.lttng-live/test-live.sh | 7 +- 10 files changed, 423 insertions(+), 441 deletions(-) diff --git a/src/cpp-common/bt2/borrowed-object-proxy.hpp b/src/cpp-common/bt2/borrowed-object-proxy.hpp index 1b20cc89..2dee7049 100644 --- a/src/cpp-common/bt2/borrowed-object-proxy.hpp +++ b/src/cpp-common/bt2/borrowed-object-proxy.hpp @@ -22,6 +22,11 @@ public: { } + ObjT *operator->() noexcept + { + return &_mObj; + } + const ObjT *operator->() const noexcept { return &_mObj; diff --git a/src/plugins/ctf/common/src/item-seq/medium.hpp b/src/plugins/ctf/common/src/item-seq/medium.hpp index e1f1a805..34c987ab 100644 --- a/src/plugins/ctf/common/src/item-seq/medium.hpp +++ b/src/plugins/ctf/common/src/item-seq/medium.hpp @@ -8,9 +8,7 @@ #define BABELTRACE_PLUGINS_CTF_COMMON_SRC_ITEM_SEQ_MEDIUM_HPP #include -#include #include -#include #include "cpp-common/bt2c/data-len.hpp" @@ -42,10 +40,6 @@ public: */ explicit Buf(const std::uint8_t *addr, bt2c::DataLen size) noexcept; - /* Default copy operations */ - Buf(const Buf&) noexcept = default; - Buf& operator=(const Buf&) noexcept = default; - /* * Address of this buffer. * diff --git a/src/plugins/ctf/lttng-live/data-stream.cpp b/src/plugins/ctf/lttng-live/data-stream.cpp index 8f4ae873..2adf305e 100644 --- a/src/plugins/ctf/lttng-live/data-stream.cpp +++ b/src/plugins/ctf/lttng-live/data-stream.cpp @@ -10,117 +10,147 @@ #include #include -#include -#include #include #include "common/assert.h" #include "compat/mman.h" /* IWYU pragma: keep */ +#include "cpp-common/bt2/wrap.hpp" #include "cpp-common/bt2s/make-unique.hpp" #include "cpp-common/vendor/fmt/format.h" -#include "../common/src/msg-iter/msg-iter.hpp" +#include "../common/src/pkt-props.hpp" #include "data-stream.hpp" #define STREAM_NAME_PREFIX "stream-" -static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr, - size_t *buffer_sz, void *data) +using namespace bt2c::literals::datalen; + +namespace ctf { +namespace src { +namespace live { + +Buf CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream, bt2c::DataLen minSize) { - lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data; - struct lttng_live_trace *trace = stream->trace; - struct lttng_live_session *session = trace->session; - struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter; - uint64_t recv_len = 0; - uint64_t len_left; - uint64_t read_len; - - BT_ASSERT(request_sz); - - if (stream->has_stream_hung_up) { - return CTF_MSG_ITER_MEDIUM_STATUS_EOF; - } + BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}", + _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, + requestedOffsetInStream.bytes(), minSize.bytes()); - len_left = stream->base_offset + stream->len - stream->offset; - if (!len_left) { - lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); - return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; - } + if (_mLiveStreamIter.has_stream_hung_up) + throw NoData {}; - read_len = MIN(request_sz, stream->buf.size()); - read_len = MIN(read_len, len_left); + BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream); + auto requestedOffsetInPacket = requestedOffsetInStream - _mCurPktBegOffsetInStream; - const auto status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf.data(), - stream->offset, read_len, &recv_len); + BT_ASSERT(_mLiveStreamIter.curPktInfo); - if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) { - return status; + if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) { + _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len; + _mLiveStreamIter.curPktInfo.reset(); + lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); + throw bt2c::TryAgain {}; } - *buffer_addr = stream->buf.data(); - *buffer_sz = recv_len; - stream->offset += recv_len; + auto requestedOffsetInRelay = + _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket; + auto lenUntilEndOfPacket = _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket; - return CTF_MSG_ITER_MEDIUM_STATUS_OK; -} + auto maxReqLen = bt2c::DataLen::fromBytes( + _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size); + auto reqLen = std::min(lenUntilEndOfPacket, maxReqLen); + uint64_t recvLen; -static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data) -{ - lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data; - - if (!lttng_live_stream->stream) { - uint64_t stream_class_id = bt_stream_class_get_id(stream_class); - - BT_CPPLOGI_SPEC(lttng_live_stream->logger, - "Creating stream {} (ID: {}) out of stream class {}", - lttng_live_stream->name, stream_id, stream_class_id); - - bt_stream *stream; - - if (stream_id < 0) { - /* - * No stream instance ID in the stream. It's possible - * to encounter this situation with older version of - * LTTng. In these cases, use the viewer_stream_id that - * is unique for a live viewer session. - */ - stream = - bt_stream_create_with_id(stream_class, lttng_live_stream->trace->trace->libObjPtr(), - lttng_live_stream->viewer_stream_id); - } else { - stream = bt_stream_create_with_id( - stream_class, lttng_live_stream->trace->trace->libObjPtr(), (uint64_t) stream_id); - } + _mBuf.resize(reqLen.bytes()); - if (!stream) { - BT_CPPLOGE_APPEND_CAUSE_SPEC( - lttng_live_stream->logger, - "Cannot create stream {} (stream class ID {}, stream ID {})", - lttng_live_stream->name, stream_class_id, stream_id); - return nullptr; - } + lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes( + _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(), + requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen); + switch (status) { + case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK: + _mBuf.resize(recvLen); + break; + + case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN: + BT_CPPLOGD("CtfLiveMedium::buf try again"); + throw bt2c::TryAgain(); - lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream); + case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF: + BT_CPPLOGD("CtfLiveMedium::buf eof"); + throw NoData(); - lttng_live_stream->stream->name(lttng_live_stream->name); + case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR: + BT_CPPLOGD("CtfLiveMedium::buf error"); + throw bt2c::Error(); } - return lttng_live_stream->stream->libObjPtr(); + const Buf buf {_mBuf.data(), bt2c::DataLen::fromBytes(_mBuf.size())}; + + BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}", + _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, fmt::ptr(buf.addr()), + buf.size().bytes()); + + return buf; } -static struct ctf_msg_iter_medium_ops medops = { - medop_request_bytes, - nullptr, - nullptr, - medop_borrow_stream, -}; +} /* namespace live */ +} /* namespace src */ +} /* namespace ctf */ + +lttng_live_iterator_status +lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter) +{ + BT_ASSERT(!liveStreamIter->msg_iter); + BT_ASSERT(!liveStreamIter->stream); + lttng_live_trace *trace = liveStreamIter->trace; + lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter; + + auto tempMedium = bt2s::make_unique(*liveStreamIter); + const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls(); + BT_ASSERT(ctfTc); + ctf::src::PktProps pktProps = + ctf::src::readPktProps(*ctfTc, std::move(tempMedium), 0_bytes, liveStreamIter->logger); + + bt2::OptionalBorrowedObject tc = ctfTc->libCls(); + BT_ASSERT(tc); + BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set); + BT_ASSERT(trace->trace); + + auto sc = tc->streamClassById(liveStreamIter->ctf_stream_class_id.value); + if (!sc) { + BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter->logger, bt2::Error, + "No stream class with id {}", + liveStreamIter->ctf_stream_class_id.value); + } + + bt_stream *streamPtr; + if (pktProps.dataStreamId) { + streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(), + *pktProps.dataStreamId); + } else { + /* + * No stream instance ID in the stream. It's possible + * to encounter this situation with older version of + * LTTng. In these cases, use the viewer_stream_id that + * is unique for a live viewer session. + */ + streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(), + liveStreamIter->viewer_stream_id); + } + BT_ASSERT(streamPtr); + liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr); + liveStreamIter->stream->name(liveStreamIter->name); + + auto medium = bt2s::make_unique(*liveStreamIter); + liveStreamIter->msg_iter.emplace(bt2::wrap(liveMsgIter->self_msg_iter), *ctfTc, + liveStreamIter->trace->metadata->metadataStreamUuid(), + *liveStreamIter->stream, std::move(medium), + ctf::src::MsgIterQuirks {}, liveStreamIter->logger); + return LTTNG_LIVE_ITERATOR_STATUS_OK; +} enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { - struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp; - if (!session->lazy_stream_msg_init) { return LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -132,26 +162,16 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi for (lttng_live_trace::UP& trace : session->traces) { for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) { - struct ctf_trace_class *ctf_tc; - if (stream_iter->msg_iter) { continue; } - ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get()); - BT_CPPLOGD_SPEC(stream_iter->logger, + const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls(); + BT_CPPLOGD_SPEC(session->logger, "Creating CTF message iterator: session-id={}, ctf-tc-addr={}, " "stream-iter-name={}, self-msg-iter-addr={}", - session->id, fmt::ptr(ctf_tc), stream_iter->name, + session->id, fmt::ptr(ctfTraceCls), stream_iter->name.c_str(), fmt::ptr(self_msg_iter)); - stream_iter->msg_iter = - ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(), - self_msg_iter, stream_iter->logger); - if (!stream_iter->msg_iter) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger, - "Failed to create CTF message iterator"); - return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } } } @@ -162,7 +182,7 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi struct lttng_live_stream_iterator * lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id, - uint64_t stream_id, bt_self_message_iterator *self_msg_iter) + uint64_t stream_id) { std::stringstream nameSs; @@ -170,9 +190,7 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c BT_ASSERT(session->lttng_live_msg_iter); BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp); - lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp; - lttng_live_trace *trace = - lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); + const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); if (!trace) { BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow CTF trace."); return nullptr; @@ -190,21 +208,6 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c stream_iter->last_inactivity_ts.is_set = false; stream_iter->last_inactivity_ts.value = 0; - if (trace->trace) { - struct ctf_trace_class *ctf_tc = - ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get()); - BT_ASSERT(!stream_iter->msg_iter); - stream_iter->msg_iter = - ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(), - self_msg_iter, stream_iter->logger); - if (!stream_iter->msg_iter) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger, - "Failed to create CTF message iterator"); - return nullptr; - } - } - stream_iter->buf.resize(lttng_live->max_query_size); - nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id; stream_iter->name = nameSs.str(); @@ -217,6 +220,18 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c return ret; } +void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter, + uint64_t ctfStreamClsId) +{ + if (streamIter->ctf_stream_class_id.is_set) { + BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId); + return; + } else { + streamIter->ctf_stream_class_id.value = ctfStreamClsId; + streamIter->ctf_stream_class_id.is_set = true; + } +} + lttng_live_stream_iterator::~lttng_live_stream_iterator() { /* Track the number of active stream iterator. */ diff --git a/src/plugins/ctf/lttng-live/data-stream.hpp b/src/plugins/ctf/lttng-live/data-stream.hpp index 9ad85bba..702454e7 100644 --- a/src/plugins/ctf/lttng-live/data-stream.hpp +++ b/src/plugins/ctf/lttng-live/data-stream.hpp @@ -16,6 +16,35 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi struct lttng_live_stream_iterator * lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id, - uint64_t stream_id, bt_self_message_iterator *self_msg_iter); + uint64_t stream_id); + +namespace ctf { +namespace src { +namespace live { + +struct CtfLiveMedium : Medium +{ + CtfLiveMedium(lttng_live_stream_iterator& liveStreamIter) : + _mLogger {liveStreamIter.logger, "PLUGIN/SRC.CTF.LTTNG-LIVE/CTF-LIVE-MEDIUM"}, + _mLiveStreamIter(liveStreamIter) + { + } + + Buf buf(bt2c::DataLen offset, bt2c::DataLen minSize) override; + +private: + bt2c::Logger _mLogger; + lttng_live_stream_iterator& _mLiveStreamIter; + + bt2c::DataLen _mCurPktBegOffsetInStream = bt2c::DataLen::fromBits(0); + std::vector _mBuf; +}; + +} /* namespace live */ +} /* namespace src */ +} /* namespace ctf */ + +lttng_live_iterator_status +lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter); #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_DATA_STREAM_HPP */ diff --git a/src/plugins/ctf/lttng-live/lttng-live.cpp b/src/plugins/ctf/lttng-live/lttng-live.cpp index 63681810..a1105828 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.cpp +++ b/src/plugins/ctf/lttng-live/lttng-live.cpp @@ -12,6 +12,7 @@ #include #include "common/assert.h" +#include "cpp-common/bt2/wrap.hpp" #include "cpp-common/bt2c/fmt.hpp" #include "cpp-common/bt2c/glib-up.hpp" #include "cpp-common/bt2c/vector.hpp" @@ -113,9 +114,9 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint "session-id={}, hostname=\"{}\", session-name=\"{}\"", session_id, hostname, session_name); - auto session = bt2s::make_unique(lttng_live_msg_iter->logger); + auto session = bt2s::make_unique(lttng_live_msg_iter->logger, + lttng_live_msg_iter->selfComp); - session->self_comp = lttng_live_msg_iter->self_comp; session->id = session_id; session->lttng_live_msg_iter = lttng_live_msg_iter; session->new_streams_needed = true; @@ -247,16 +248,17 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da goto end; } - lttng_live_stream->base_offset = index.offset; - lttng_live_stream->offset = index.offset; - lttng_live_stream->len = index.packet_size / CHAR_BIT; + lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo { + bt2c::DataLen::fromBytes(index.offset), + bt2c::DataLen::fromBits(index.packet_size), + }); BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Setting live stream reading info: stream-name=\"{}\", " - "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}", + "viewer-stream-id={}, stream-offset-in-relay={}, stream-len-bytes={}", lttng_live_stream->name, lttng_live_stream->viewer_stream_id, - lttng_live_stream->base_offset, lttng_live_stream->offset, - lttng_live_stream->len); + lttng_live_stream->curPktInfo->offsetInRelay.bytes(), + lttng_live_stream->curPktInfo->len.bytes()); end: if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) { @@ -281,8 +283,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, if (!session->attached) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}", session->id); - enum lttng_live_viewer_status attach_status = - lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter); + lttng_live_viewer_status attach_status = lttng_live_session_attach(session); if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { /* @@ -305,7 +306,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, "Updating all data streams: session-id={}, session-name=\"{}\"", session->id, session->session_name); - status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter); + status = lttng_live_session_get_new_streams(session); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_OK: break; @@ -341,6 +342,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter, status = lttng_live_metadata_update(trace.get()); switch (status) { case LTTNG_LIVE_ITERATOR_STATUS_END: + break; case LTTNG_LIVE_ITERATOR_STATUS_OK: break; case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: @@ -470,7 +472,8 @@ emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter, timestamp); const auto msg = bt_message_message_iterator_inactivity_create( - lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); + lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class->libObjPtr(), + timestamp); if (!msg) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, @@ -516,63 +519,53 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies } static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter, - const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns) + bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns) { - const bt_clock_snapshot *clock_snapshot = NULL; - int ret = 0; + bt2::OptionalBorrowedObject clockSnapshot; - BT_ASSERT_DBG(msg); BT_ASSERT_DBG(ts_ns); BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Getting message's timestamp: iter-data-addr={}, msg-addr={}, " - "last-msg-ts={}", - fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns); + "Getting message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}", + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns); - switch (bt_message_get_type(msg)) { - case BT_MESSAGE_TYPE_EVENT: - clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg); + switch (msg.type()) { + case bt2::MessageType::Event: + clockSnapshot = msg.asEvent().defaultClockSnapshot(); break; - case BT_MESSAGE_TYPE_PACKET_BEGINNING: - clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg); + case bt2::MessageType::PacketBeginning: + + clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot(); break; - case BT_MESSAGE_TYPE_PACKET_END: - clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg); + case bt2::MessageType::PacketEnd: + clockSnapshot = msg.asPacketEnd().defaultClockSnapshot(); break; - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - clock_snapshot = - bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg); + case bt2::MessageType::DiscardedEvents: + clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot(); break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - clock_snapshot = - bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg); + case bt2::MessageType::DiscardedPackets: + clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot(); break; - case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg); + case bt2::MessageType::MessageIteratorInactivity: + clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot(); break; default: /* All the other messages have a higher priority */ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Message has no timestamp, using the last message timestamp: " "iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", - fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns, + *ts_ns); *ts_ns = last_msg_ts_ns; return 0; } - ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); - if (ret) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, - "Cannot get nanoseconds from Epoch of clock snapshot: " - "clock-snapshot-addr={}", - fmt::ptr(clock_snapshot)); - return -1; - } + *ts_ns = clockSnapshot->nsFromOrigin(); BT_CPPLOGD_SPEC( lttng_live_msg_iter->logger, "Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}", - fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns); + fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns, *ts_ns); return 0; } @@ -581,15 +574,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message) { - enum ctf_msg_iter_status status; - for (const auto& session : lttng_live_msg_iter->sessions) { if (session->new_streams_needed) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Need an update for streams: session-id={}", session->id); return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } - for (lttng_live_trace::UP& trace : session->traces) { + + for (const auto& trace : session->traces) { if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Need an update for metadata stream: session-id={}, trace-id={}", @@ -606,27 +598,28 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - const bt_message *msg; - status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg); - switch (status) { - case CTF_MSG_ITER_STATUS_EOF: - return LTTNG_LIVE_ITERATOR_STATUS_END; - case CTF_MSG_ITER_STATUS_OK: - message = bt2::ConstMessage::Shared::createWithoutRef(msg); - return LTTNG_LIVE_ITERATOR_STATUS_OK; - case CTF_MSG_ITER_STATUS_AGAIN: - /* - * Continue immediately (end of packet). The next - * get_index may return AGAIN to delay the following - * attempt. - */ - return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; - case CTF_MSG_ITER_STATUS_ERROR: - default: - BT_CPPLOGE_APPEND_CAUSE_SPEC( - lttng_live_msg_iter->logger, - "CTF message iterator failed to get next message: msg-iter={}, msg-iter-status={}", - fmt::ptr(lttng_live_stream->msg_iter), status); + if (!lttng_live_stream->msg_iter) { + /* The first time we're called for this stream, the MsgIter is not instantiated. */ + enum lttng_live_iterator_status ret = + lttng_live_stream_iterator_create_msg_iter(lttng_live_stream); + if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) { + return ret; + } + } + + try { + message = lttng_live_stream->msg_iter->next(); + if (message) { + return LTTNG_LIVE_ITERATOR_STATUS_OK; + } else { + return LTTNG_LIVE_ITERATOR_STATUS_END; + } + } catch (const bt2c::TryAgain&) { + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } catch (const bt2::Error&) { + BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, + "CTF message iterator failed to get next message: msg-iter={}", + fmt::ptr(&*lttng_live_stream->msg_iter)); return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } } @@ -646,25 +639,26 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter * `ctf_msg_iter` should simply realize that it needs to close the * stream properly by emitting the necessary stream end message. */ - const bt_message *msg; - enum ctf_msg_iter_status status = - ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg); + try { + if (!stream_iter->msg_iter) { + BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, + "Reached the end of the live stream iterator."); + return LTTNG_LIVE_ITERATOR_STATUS_END; + } + + curr_msg = stream_iter->msg_iter->next(); + if (!curr_msg) { + BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, + "Reached the end of the live stream iterator."); + return LTTNG_LIVE_ITERATOR_STATUS_END; + } - if (status == CTF_MSG_ITER_STATUS_ERROR) { + return LTTNG_LIVE_ITERATOR_STATUS_OK; + } catch (const bt2::Error&) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Error getting the next message from CTF message iterator"); return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } else if (status == CTF_MSG_ITER_STATUS_EOF) { - BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger, - "Reached the end of the live stream iterator."); - return LTTNG_LIVE_ITERATOR_STATUS_END; } - - BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); - - curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg); - - return LTTNG_LIVE_ITERATOR_STATUS_OK; } /* @@ -802,58 +796,42 @@ static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg) } static enum lttng_live_iterator_status -adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, - uint64_t new_begin_ts) +adjust_discarded_packets_message(bt_self_message_iterator *iter, bt2::Stream stream, + bt2::ConstDiscardedPacketsMessage msgIn, + bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts) { - enum bt_property_availability availability; - const bt_clock_snapshot *clock_snapshot; - uint64_t end_ts; - uint64_t count; - - clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in); - end_ts = bt_clock_snapshot_get_value(clock_snapshot); - - availability = bt_message_discarded_packets_get_count(msg_in, &count); - BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + BT_ASSERT_DBG(msgIn.count()); const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots( - iter, stream, new_begin_ts, end_ts); + iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value()); if (!msg) { return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_packets_set_count(msg, count); - msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + bt_message_discarded_packets_set_count(msg, *msgIn.count()); + msgOut = bt2::Message::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } static enum lttng_live_iterator_status -adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream, - const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out, - uint64_t new_begin_ts) +adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream, + bt2::ConstDiscardedEventsMessage msgIn, + bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts) { - enum bt_property_availability availability; - const bt_clock_snapshot *clock_snapshot; - uint64_t end_ts; - uint64_t count; - - clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in); - end_ts = bt_clock_snapshot_get_value(clock_snapshot); - - availability = bt_message_discarded_events_get_count(msg_in, &count); - BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + BT_ASSERT_DBG(msgIn.count()); const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots( - iter, stream, new_begin_ts, end_ts); + iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value()); if (!msg) { return LTTNG_LIVE_ITERATOR_STATUS_NOMEM; } - bt_message_discarded_events_set_count(msg, count); - msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg); + bt_message_discarded_events_set_count(msg, *msgIn.count()); + msgOut = bt2::Message::Shared::createWithoutRef(msg); + return LTTNG_LIVE_ITERATOR_STATUS_OK; } @@ -862,13 +840,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns, const bt2::ConstMessage& late_msg) { - const bt_clock_class *clock_class; - const bt_stream_class *stream_class; - enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; - int64_t last_inactivity_ts_ns; - enum lttng_live_iterator_status adjust_status; - bt2::ConstMessage::Shared adjusted_message; - /* * The timestamp of the current message is before the last message sent * by this component. We CANNOT send it as is. @@ -920,18 +891,11 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr()); - clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); - - ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( - clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns); - if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, - "Error converting last " - "inactivity message timestamp to nanoseconds"); - return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } + const auto streamClass = stream_iter->stream->cls(); + const auto clockClass = streamClass.defaultClockClass(); + int64_t last_inactivity_ts_ns = + clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value); if (last_inactivity_ts_ns <= late_msg_ts_ns) { BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger, "Invalid live stream state: " @@ -947,30 +911,36 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter, * adjust its timestamp to ensure monotonicity. */ BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, - "Adjusting the timestamp of late message: late-msg-type={}, " - "msg-new-ts-ns={}", + "Adjusting the timestamp of late message: late-msg-type={}, msg-new-ts-ns={}", late_msg.type(), stream_iter->last_inactivity_ts.value); - switch (late_msg.type()) { - case bt2::MessageType::DiscardedEvents: - adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), - late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); - break; - case bt2::MessageType::DiscardedPackets: - adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(), - late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value); - break; - default: - bt_common_abort(); - } + + bt2::ConstMessage::Shared adjustedMessage; + + const auto adjust_status = bt2c::call([&] { + switch (late_msg.type()) { + case bt2::MessageType::DiscardedEvents: + return adjust_discarded_events_message(lttng_live_msg_iter->self_msg_iter, + *stream_iter->stream, + late_msg.asDiscardedEvents(), adjustedMessage, + stream_iter->last_inactivity_ts.value); + + case bt2::MessageType::DiscardedPackets: + return adjust_discarded_packets_message(lttng_live_msg_iter->self_msg_iter, + *stream_iter->stream, + late_msg.asDiscardedPackets(), adjustedMessage, + stream_iter->last_inactivity_ts.value); + + default: + bt_common_abort(); + } + }); if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { return adjust_status; } - BT_ASSERT_DBG(adjusted_message); - stream_iter->current_msg = adjusted_message; + BT_ASSERT_DBG(adjustedMessage); + stream_iter->current_msg = std::move(adjustedMessage); stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; return LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -1034,8 +1004,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter, * Get the timestamp in nanoseconds from origin of this * message. */ - live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(), - lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns); + live_get_msg_ts_ns(lttng_live_msg_iter, *msg, lttng_live_msg_iter->last_msg_ts_ns, + &curr_msg_ts_ns); /* * Check if the message of the current live stream @@ -1500,9 +1470,9 @@ static lttng_live_msg_iter::UP lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp, bt_self_message_iterator *self_msg_it) { - auto msg_iter = bt2s::make_unique(lttng_live_comp->logger); + auto msg_iter = bt2s::make_unique(lttng_live_comp->logger, + lttng_live_comp->selfComp); - msg_iter->self_comp = lttng_live_comp->self_comp; msg_iter->lttng_live_comp = lttng_live_comp; msg_iter->self_msg_iter = self_msg_it; msg_iter->active_stream_iter = 0; @@ -1783,7 +1753,7 @@ lttng_live_component_create(const bt_value *params, bt_self_component_source *se const bt_value *value; enum bt_param_validation_status validation_status; gchar *validation_error = NULL; - bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"}; + bt2c::Logger logger {bt2::wrap(self_comp), "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"}; validation_status = bt_param_validation_validate(params, params_descr, &validation_error); if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) { @@ -1794,8 +1764,9 @@ lttng_live_component_create(const bt_value *params, bt_self_component_source *se return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR; } - auto lttng_live = bt2s::make_unique(std::move(logger)); - lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp); + auto lttng_live = + bt2s::make_unique(std::move(logger), bt2::wrap(self_comp)); + lttng_live->max_query_size = MAX_QUERY_SIZE; lttng_live->has_msg_iter = false; diff --git a/src/plugins/ctf/lttng-live/lttng-live.hpp b/src/plugins/ctf/lttng-live/lttng-live.hpp index be2689ce..19df6353 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.hpp +++ b/src/plugins/ctf/lttng-live/lttng-live.hpp @@ -19,8 +19,8 @@ #include "cpp-common/bt2/message.hpp" #include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */ -#include "../common/src/metadata/tsdl/decoder.hpp" -#include "../common/src/msg-iter/msg-iter.hpp" +#include "../common/src/metadata/metadata-stream-parser-utils.hpp" +#include "../common/src/msg-iter.hpp" #include "viewer-connection.hpp" /* @@ -113,7 +113,7 @@ struct lttng_live_stream_iterator * Since only a single iterator per viewer connection, we have * only a single message iterator per stream. */ - ctf_msg_iter_up msg_iter; + bt2s::optional msg_iter; uint64_t viewer_stream_id = 0; @@ -123,13 +123,6 @@ struct lttng_live_stream_iterator uint64_t value = 0; } ctf_stream_class_id; - /* base offset in current index. */ - uint64_t base_offset = 0; - /* len to read in current index. */ - uint64_t len = 0; - /* offset in current index. */ - uint64_t offset = 0; - /* * Clock Snapshot value of the last message iterator inactivity message * sent downstream. @@ -154,28 +147,63 @@ struct lttng_live_stream_iterator /* Timestamp in nanoseconds of the current message (current_msg). */ int64_t current_msg_ts_ns = 0; - std::vector buf; - std::string name; bool has_stream_hung_up = false; + + struct CurPktInfo + { + bt2c::DataLen offsetInRelay; + bt2c::DataLen len; + }; + + bt2s::optional curPktInfo; }; struct lttng_live_metadata { using UP = std::unique_ptr; - explicit lttng_live_metadata(const bt2c::Logger& parentLogger) : - logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"} + explicit lttng_live_metadata(const bt2::SelfComponent selfComp, + const bt2c::Logger& parentLogger) : + logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"}, + _mSelfComp {selfComp} + { } + const ctf::src::TraceCls *traceCls() const + { + return _mMetadataStreamParser->traceCls(); + } + + const bt2s::optional& metadataStreamUuid() const noexcept + { + return _mMetadataStreamParser->metadataStreamUuid(); + } + + void parseSection(const bt2c::ConstBytes data) + { + if (!_mMetadataStreamParser) { + _mMetadataStreamParser = + ctf::src::createMetadataStreamParser(data, _mSelfComp, {}, logger); + } + + _mMetadataStreamParser->parseSection(data); + } + + bt2::SelfComponent selfComp() const noexcept + { + return _mSelfComp; + } + bt2c::Logger logger; uint64_t stream_id = 0; - /* Weak reference. */ - ctf_metadata_decoder_up decoder; +private: + bt2::SelfComponent _mSelfComp; + ctf::src::MetadataStreamParser::UP _mMetadataStreamParser; }; enum lttng_live_metadata_stream_state @@ -219,11 +247,9 @@ struct lttng_live_trace bt2::Trace::Shared trace; - bt2::TraceClass::Shared trace_class; - lttng_live_metadata::UP metadata; - const bt_clock_class *clock_class = nullptr; + bt2::OptionalBorrowedObject clock_class; std::vector stream_iterators; @@ -235,8 +261,10 @@ struct lttng_live_session { using UP = std::unique_ptr; - explicit lttng_live_session(const bt2c::Logger& parentLogger) : - logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"} + explicit lttng_live_session(const bt2c::Logger& parentLogger, + const bt2::SelfComponent selfCompParam) : + logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"}, + selfComp {selfCompParam} { } @@ -244,7 +272,7 @@ struct lttng_live_session bt2c::Logger logger; - bt_self_component *self_comp = nullptr; + bt2::SelfComponent selfComp; /* Weak reference. */ struct lttng_live_msg_iter *lttng_live_msg_iter = nullptr; @@ -277,15 +305,16 @@ struct lttng_live_component { using UP = std::unique_ptr; - explicit lttng_live_component(bt2c::Logger loggerParam) noexcept : - logger {std::move(loggerParam)} + explicit lttng_live_component(bt2c::Logger loggerParam, + const bt2::SelfComponent selfCompParam) noexcept : + logger {std::move(loggerParam)}, + selfComp {selfCompParam} { } bt2c::Logger logger; - /* Weak reference. */ - bt_self_component *self_comp = nullptr; + bt2::SelfComponent selfComp; struct { @@ -306,8 +335,10 @@ struct lttng_live_msg_iter { using UP = std::unique_ptr; - explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger) : - logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"} + explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger, + const bt2::SelfComponent selfCompParam) : + logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"}, + selfComp {selfCompParam} { } @@ -315,7 +346,7 @@ struct lttng_live_msg_iter bt2c::Logger logger; - bt_self_component *self_comp = nullptr; + bt2::SelfComponent selfComp; /* Weak reference. */ struct lttng_live_component *lttng_live_comp = nullptr; @@ -411,14 +442,12 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it, void lttng_live_msg_iter_finalize(bt_self_message_iterator *it); -enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session, - bt_self_message_iterator *self_msg_iter); +enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session); enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session); enum lttng_live_iterator_status -lttng_live_session_get_new_streams(struct lttng_live_session *session, - bt_self_message_iterator *self_msg_iter); +lttng_live_session_get_new_streams(struct lttng_live_session *session); struct lttng_live_trace * lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session, @@ -435,20 +464,18 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint * written to the file. */ enum lttng_live_get_one_metadata_status -lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector& buf); +lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector& buf); enum lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream, struct packet_index *index); -enum ctf_msg_iter_medium_status -lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, - struct lttng_live_stream_iterator *stream, uint8_t *buf, - uint64_t offset, uint64_t req_len, uint64_t *recv_len); - bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter); void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter, enum lttng_live_stream_state new_state); +void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter, + uint64_t ctfStreamClsId); + #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_LTTNG_LIVE_HPP */ diff --git a/src/plugins/ctf/lttng-live/metadata.cpp b/src/plugins/ctf/lttng-live/metadata.cpp index 693042d7..660843fb 100644 --- a/src/plugins/ctf/lttng-live/metadata.cpp +++ b/src/plugins/ctf/lttng-live/metadata.cpp @@ -6,12 +6,9 @@ * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation */ -#include "compat/memstream.h" -#include "cpp-common/bt2c/libc-up.hpp" #include "cpp-common/bt2s/make-unique.hpp" #include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp" -#include "../common/src/metadata/tsdl/decoder.hpp" #include "lttng-live.hpp" #include "metadata.hpp" @@ -31,25 +28,18 @@ struct packet_header uint8_t minor; } __attribute__((__packed__)); -static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc, +static bool stream_classes_all_have_default_clock_class(bt2::ConstTraceClass tc, const bt2c::Logger& logger) { - uint64_t i, sc_count; - const bt_clock_class *cc = NULL; - const bt_stream_class *sc; + for (std::uint64_t i = 0; i < tc.length(); ++i) { + auto sc = tc[i]; + auto cc = sc.defaultClockClass(); - sc_count = bt_trace_class_get_stream_class_count(tc); - for (i = 0; i < sc_count; i++) { - sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i); - - BT_ASSERT(sc); - - cc = bt_stream_class_borrow_default_clock_class_const(sc); if (!cc) { BT_CPPLOGE_APPEND_CAUSE_SPEC(logger, "Stream class doesn't have a default clock class: " "sc-id={}, sc-name=\"{}\"", - bt_stream_class_get_id(sc), bt_stream_class_get_name(sc)); + sc.id(), sc.name()); return false; } } @@ -61,34 +51,16 @@ static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc, * encountered. This is useful to create message iterator inactivity message as * we don't need a particular clock class. */ -static const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc) +static bt2::ConstClockClass borrow_any_clock_class(bt2::ConstTraceClass tc) { - uint64_t i, sc_count; - const bt_clock_class *cc = NULL; - const bt_stream_class *sc; - - sc_count = bt_trace_class_get_stream_class_count(tc); - for (i = 0; i < sc_count; i++) { - sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i); - BT_ASSERT_DBG(sc); - - cc = bt_stream_class_borrow_default_clock_class_const(sc); - if (cc) { - return cc; - } - } - - bt_common_abort(); + return *tc[0].defaultClockClass(); } enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_trace *trace) { struct lttng_live_session *session = trace->session; struct lttng_live_metadata *metadata = trace->metadata.get(); - std::vector metadataBuf; bool keep_receiving; - bt2c::FileUP fp; - enum ctf_metadata_decoder_status decoder_status; enum lttng_live_get_one_metadata_status metadata_status; BT_CPPLOGD_SPEC(metadata->logger, "Updating metadata for trace: session-id={}, trace-id={}", @@ -117,6 +89,7 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra keep_receiving = true; /* Grab all available metadata. */ + std::vector metadataBuf; while (keep_receiving) { /* * lttng_live_get_one_metadata_packet() asks the Relay Daemon @@ -173,79 +146,49 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra return LTTNG_LIVE_ITERATOR_STATUS_OK; } - /* - * Open a new reading file handle on the `metadata_buf` and pass it to - * the metadata decoder. - */ - fp.reset(bt_fmemopen(metadataBuf.data(), metadataBuf.size(), "rb")); - if (!fp) { - if (errno == EINTR && lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) { - session->lttng_live_msg_iter->was_interrupted = true; - return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(metadata->logger, - "Cannot memory-open metadata buffer", "."); - return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } - } - /* * The call to ctf_metadata_decoder_append_content() will append * new metadata to our current trace class. */ BT_CPPLOGD_SPEC(metadata->logger, "Appending new metadata to the ctf_trace class"); - decoder_status = ctf_metadata_decoder_append_content(metadata->decoder.get(), fp.get()); - switch (decoder_status) { - case CTF_METADATA_DECODER_STATUS_OK: - if (!trace->trace_class) { - struct ctf_trace_class *tc = - ctf_metadata_decoder_borrow_ctf_trace_class(metadata->decoder.get()); + metadata->parseSection(metadataBuf); + if (!trace->trace) { + const ctf::src::TraceCls *ctfTraceCls = metadata->traceCls(); + BT_ASSERT(ctfTraceCls); + bt2::OptionalBorrowedObject irTraceCls = ctfTraceCls->libCls(); - trace->trace_class = ctf_metadata_decoder_get_ir_trace_class(metadata->decoder.get()); - trace->trace = trace->trace_class->instantiate(); - if (!trace->trace) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(metadata->logger, "Failed to create bt_trace"); - return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } + if (irTraceCls) { + trace->trace = irTraceCls->instantiate(); - ctf_trace_class_configure_ir_trace(tc, *trace->trace); + ctf_trace_class_configure_ir_trace(*ctfTraceCls, *trace->trace, + metadata->selfComp().graphMipVersion(), + metadata->logger); - if (!stream_classes_all_have_default_clock_class(trace->trace_class->libObjPtr(), + if (!stream_classes_all_have_default_clock_class(trace->trace->cls(), metadata->logger)) { /* Error logged in function. */ return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - trace->clock_class = borrow_any_clock_class(trace->trace_class->libObjPtr()); + + trace->clock_class = borrow_any_clock_class(trace->trace->cls()); } + } - /* The metadata was updated successfully. */ - trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED; + /* The metadata was updated successfully. */ + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED; - return LTTNG_LIVE_ITERATOR_STATUS_OK; - default: - return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } + return LTTNG_LIVE_ITERATOR_STATUS_OK; } int lttng_live_metadata_create_stream(struct lttng_live_session *session, uint64_t ctf_trace_id, uint64_t stream_id) { - struct lttng_live_trace *trace; + auto metadata = bt2s::make_unique(session->selfComp, session->logger); - ctf_metadata_decoder_config cfg {session->logger}; - cfg.self_comp = session->self_comp; - cfg.create_trace_class = true; - - auto metadata = bt2s::make_unique(session->logger); metadata->stream_id = stream_id; - metadata->decoder = ctf_metadata_decoder_create(&cfg); - if (!metadata->decoder) { - BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to create CTF metadata decoder"); - return -1; - } + const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); - trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id); if (!trace) { BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow trace"); return -1; diff --git a/src/plugins/ctf/lttng-live/viewer-connection.cpp b/src/plugins/ctf/lttng-live/viewer-connection.cpp index b639b6ed..cf69f1bb 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.cpp +++ b/src/plugins/ctf/lttng-live/viewer-connection.cpp @@ -62,16 +62,16 @@ viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_statu bt_common_abort(); } -static inline enum ctf_msg_iter_medium_status -viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status) +static inline enum lttng_live_get_stream_bytes_status +viewer_status_to_lttng_live_get_stream_bytes_status(enum lttng_live_viewer_status viewer_status) { switch (viewer_status) { case LTTNG_LIVE_VIEWER_STATUS_OK: - return CTF_MSG_ITER_MEDIUM_STATUS_OK; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK; case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: - return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN; case LTTNG_LIVE_VIEWER_STATUS_ERROR: - return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR; } bt_common_abort(); @@ -730,8 +730,7 @@ lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter } static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session, - uint32_t stream_count, - bt_self_message_iterator *self_msg_iter) + uint32_t stream_count) { uint32_t i; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; @@ -767,8 +766,7 @@ static enum lttng_live_viewer_status receive_streams(struct lttng_live_session * } else { BT_CPPLOGI_SPEC(viewer_connection->logger, " stream {} : {}/{}", stream_id, stream.path_name, stream.channel_name); - live_stream = - lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter); + live_stream = lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id); if (!live_stream) { BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream"); return LTTNG_LIVE_VIEWER_STATUS_ERROR; @@ -779,8 +777,7 @@ static enum lttng_live_viewer_status receive_streams(struct lttng_live_session * return LTTNG_LIVE_VIEWER_STATUS_OK; } -enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session, - bt_self_message_iterator *self_msg_iter) +enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; enum lttng_live_viewer_status status; @@ -851,7 +848,7 @@ enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_sessio } /* We receive the initial list of streams. */ - status = receive_streams(session, streams_count, self_msg_iter); + status = receive_streams(session, streams_count); switch (status) { case LTTNG_LIVE_VIEWER_STATUS_OK: break; @@ -941,14 +938,13 @@ enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_sessio } enum lttng_live_get_one_metadata_status -lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector& buf) +lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector& buf) { uint64_t len = 0; enum lttng_live_viewer_status viewer_status; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - std::vector data; struct lttng_live_session *session = trace->session; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; struct lttng_live_metadata *metadata = trace->metadata.get(); @@ -1025,21 +1021,15 @@ lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector localBuf(len); - viewer_status = lttng_live_recv(viewer_connection, data.data(), len); + viewer_status = lttng_live_recv(viewer_connection, localBuf.data(), len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(viewer_status, "get metadata packet"); return (lttng_live_get_one_metadata_status) viewer_status; } - /* - * Write the metadata to the file handle. - */ - buf.insert(buf.end(), data.begin(), data.end()); + buf.insert(buf.end(), localBuf.begin(), localBuf.end()); return LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; } @@ -1197,7 +1187,7 @@ lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter, } } -enum ctf_msg_iter_medium_status +lttng_live_get_stream_bytes_status lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) @@ -1237,13 +1227,13 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_send_status(viewer_status, "get data packet command"); - return viewer_status_to_ctf_msg_iter_medium_status(viewer_status); + return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status); } viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(viewer_status, "get data packet reply"); - return viewer_status_to_ctf_msg_iter_medium_status(viewer_status); + return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status); } flags = be32toh(rp.flags); @@ -1261,7 +1251,7 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, break; case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ - return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_CPPLOGD_SPEC(viewer_connection->logger, @@ -1281,39 +1271,38 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, BT_CPPLOGD_SPEC(viewer_connection->logger, "Reply with any one flags set means we should retry: response={}", static_cast(rp_status)); - return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN; } BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Received get_data_packet response: error"); - return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR; case LTTNG_VIEWER_GET_PACKET_EOF: - return CTF_MSG_ITER_MEDIUM_STATUS_EOF; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF; default: BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Received get_data_packet response: unknown ({})", rp_status); - return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR; } if (req_len == 0) { - return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR; } viewer_status = lttng_live_recv(viewer_connection, buf, req_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(viewer_status, "get data packet"); - return viewer_status_to_ctf_msg_iter_medium_status(viewer_status); + return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status); } *recv_len = req_len; - return CTF_MSG_ITER_MEDIUM_STATUS_OK; + return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK; } /* * Request new streams for a session. */ enum lttng_live_iterator_status -lttng_live_session_get_new_streams(struct lttng_live_session *session, - bt_self_message_iterator *self_msg_iter) +lttng_live_session_get_new_streams(struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; @@ -1383,7 +1372,7 @@ lttng_live_session_get_new_streams(struct lttng_live_session *session, return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } - viewer_status = receive_streams(session, streams_count, self_msg_iter); + viewer_status = receive_streams(session, streams_count); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(viewer_status, "new streams"); return viewer_status_to_live_iterator_status(viewer_status); diff --git a/src/plugins/ctf/lttng-live/viewer-connection.hpp b/src/plugins/ctf/lttng-live/viewer-connection.hpp index 9610a15e..ebbb7467 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.hpp +++ b/src/plugins/ctf/lttng-live/viewer-connection.hpp @@ -7,6 +7,7 @@ #ifndef BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP #define BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP +#include #include #include @@ -111,4 +112,17 @@ lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter bt2::Value::Shared live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection); +enum lttng_live_get_stream_bytes_status +{ + LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK = __BT_FUNC_STATUS_OK, + LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN, + LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR = __BT_FUNC_STATUS_ERROR, + LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF = __BT_FUNC_STATUS_END, +}; + +lttng_live_get_stream_bytes_status +lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream, uint8_t *buf, + uint64_t offset, uint64_t req_len, uint64_t *recv_len); + #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP */ diff --git a/tests/plugins/src.ctf.lttng-live/test-live.sh b/tests/plugins/src.ctf.lttng-live/test-live.sh index e77f2711..274302fb 100755 --- a/tests/plugins/src.ctf.lttng-live/test-live.sh +++ b/tests/plugins/src.ctf.lttng-live/test-live.sh @@ -307,14 +307,9 @@ test_compare_to_ctf_fs() { bt_remove_cr "${expected_stdout}" bt_remove_cr "${expected_stderr}" - # Hack. To be removed when src.ctf.lttng-live is updated to use the new - # IR generator. - "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}" - "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}" - "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}" - run_test "$test_text" "$cli_args_template" "$expected_stdout" \ "$expected_stderr" "$trace_dir_native" "${server_args[@]}" + diag "Inverse session order from lttng-relayd" run_test "$test_text" "$cli_args_template" "$expected_stdout" \ "$expected_stderr" "$trace_dir_native" "${server_args_inverse[@]}" -- 2.34.1