src.ctf.lttng-live: use the new metadata stream parser and message iterator
authorSimon Marchi <simon.marchi@efficios.com>
Fri, 24 May 2024 21:21:56 +0000 (17:21 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Wed, 4 Sep 2024 19:05:14 +0000 (15:05 -0400)
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Change-Id: Id2e11205bed54654942077c5336495b7bdd3f38d
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8617
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8648

src/cpp-common/bt2/borrowed-object-proxy.hpp
src/plugins/ctf/common/src/item-seq/medium.hpp
src/plugins/ctf/lttng-live/data-stream.cpp
src/plugins/ctf/lttng-live/data-stream.hpp
src/plugins/ctf/lttng-live/lttng-live.cpp
src/plugins/ctf/lttng-live/lttng-live.hpp
src/plugins/ctf/lttng-live/metadata.cpp
src/plugins/ctf/lttng-live/viewer-connection.cpp
src/plugins/ctf/lttng-live/viewer-connection.hpp
tests/plugins/src.ctf.lttng-live/test-live.sh

index 1b20cc8995f3a4c8f7c6e81c6e33c2640f88104c..2dee7049e8f78ff6300357a43a04c56adc909c11 100644 (file)
@@ -22,6 +22,11 @@ public:
     {
     }
 
+    ObjT *operator->() noexcept
+    {
+        return &_mObj;
+    }
+
     const ObjT *operator->() const noexcept
     {
         return &_mObj;
index e1f1a8052cf99bfca7ccbdf5ec4d5e85121a3133..34c987abbd0f338da970277ec60e7364befb3c3c 100644 (file)
@@ -8,9 +8,7 @@
 #define BABELTRACE_PLUGINS_CTF_COMMON_SRC_ITEM_SEQ_MEDIUM_HPP
 
 #include <cstdint>
-#include <cstdlib>
 #include <memory>
-#include <stdexcept>
 
 #include "cpp-common/bt2c/data-len.hpp"
 
@@ -42,10 +40,6 @@ public:
      */
     explicit Buf(const std::uint8_t *addr, bt2c::DataLen size) noexcept;
 
-    /* Default copy operations */
-    Buf(const Buf&) noexcept = default;
-    Buf& operator=(const Buf&) noexcept = default;
-
     /*
      * Address of this buffer.
      *
index 8f4ae873d234d7415b8c737c081d88209898dc5b..2adf305e4367ebe686265ea391db999c9eb99707 100644 (file)
 #include <sstream>
 
 #include <glib.h>
-#include <stdio.h>
-#include <stdlib.h>
 
 #include <babeltrace2/babeltrace.h>
 
 #include "common/assert.h"
 #include "compat/mman.h" /* IWYU pragma: keep  */
+#include "cpp-common/bt2/wrap.hpp"
 #include "cpp-common/bt2s/make-unique.hpp"
 #include "cpp-common/vendor/fmt/format.h"
 
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/pkt-props.hpp"
 #include "data-stream.hpp"
 
 #define STREAM_NAME_PREFIX "stream-"
 
-static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
-                                                           size_t *buffer_sz, void *data)
+using namespace bt2c::literals::datalen;
+
+namespace ctf {
+namespace src {
+namespace live {
+
+Buf CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream, bt2c::DataLen minSize)
 {
-    lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data;
-    struct lttng_live_trace *trace = stream->trace;
-    struct lttng_live_session *session = trace->session;
-    struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
-    uint64_t recv_len = 0;
-    uint64_t len_left;
-    uint64_t read_len;
-
-    BT_ASSERT(request_sz);
-
-    if (stream->has_stream_hung_up) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
-    }
+    BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}",
+               _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1,
+               requestedOffsetInStream.bytes(), minSize.bytes());
 
-    len_left = stream->base_offset + stream->len - stream->offset;
-    if (!len_left) {
-        lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
-    }
+    if (_mLiveStreamIter.has_stream_hung_up)
+        throw NoData {};
 
-    read_len = MIN(request_sz, stream->buf.size());
-    read_len = MIN(read_len, len_left);
+    BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
+    auto requestedOffsetInPacket = requestedOffsetInStream - _mCurPktBegOffsetInStream;
 
-    const auto status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf.data(),
-                                                    stream->offset, read_len, &recv_len);
+    BT_ASSERT(_mLiveStreamIter.curPktInfo);
 
-    if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
-        return status;
+    if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
+        _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
+        _mLiveStreamIter.curPktInfo.reset();
+        lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+        throw bt2c::TryAgain {};
     }
 
-    *buffer_addr = stream->buf.data();
-    *buffer_sz = recv_len;
-    stream->offset += recv_len;
+    auto requestedOffsetInRelay =
+        _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
+    auto lenUntilEndOfPacket = _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
 
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
+    auto maxReqLen = bt2c::DataLen::fromBytes(
+        _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
+    auto reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
+    uint64_t recvLen;
 
-static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data)
-{
-    lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data;
-
-    if (!lttng_live_stream->stream) {
-        uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
-
-        BT_CPPLOGI_SPEC(lttng_live_stream->logger,
-                        "Creating stream {} (ID: {}) out of stream class {}",
-                        lttng_live_stream->name, stream_id, stream_class_id);
-
-        bt_stream *stream;
-
-        if (stream_id < 0) {
-            /*
-             * No stream instance ID in the stream. It's possible
-             * to encounter this situation with older version of
-             * LTTng. In these cases, use the viewer_stream_id that
-             * is unique for a live viewer session.
-             */
-            stream =
-                bt_stream_create_with_id(stream_class, lttng_live_stream->trace->trace->libObjPtr(),
-                                         lttng_live_stream->viewer_stream_id);
-        } else {
-            stream = bt_stream_create_with_id(
-                stream_class, lttng_live_stream->trace->trace->libObjPtr(), (uint64_t) stream_id);
-        }
+    _mBuf.resize(reqLen.bytes());
 
-        if (!stream) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(
-                lttng_live_stream->logger,
-                "Cannot create stream {} (stream class ID {}, stream ID {})",
-                lttng_live_stream->name, stream_class_id, stream_id);
-            return nullptr;
-        }
+    lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
+        _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
+        requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
+    switch (status) {
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
+        _mBuf.resize(recvLen);
+        break;
+
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
+        BT_CPPLOGD("CtfLiveMedium::buf try again");
+        throw bt2c::TryAgain();
 
-        lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream);
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
+        BT_CPPLOGD("CtfLiveMedium::buf eof");
+        throw NoData();
 
-        lttng_live_stream->stream->name(lttng_live_stream->name);
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
+        BT_CPPLOGD("CtfLiveMedium::buf error");
+        throw bt2c::Error();
     }
 
-    return lttng_live_stream->stream->libObjPtr();
+    const Buf buf {_mBuf.data(), bt2c::DataLen::fromBytes(_mBuf.size())};
+
+    BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}",
+               _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, fmt::ptr(buf.addr()),
+               buf.size().bytes());
+
+    return buf;
 }
 
-static struct ctf_msg_iter_medium_ops medops = {
-    medop_request_bytes,
-    nullptr,
-    nullptr,
-    medop_borrow_stream,
-};
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
+{
+    BT_ASSERT(!liveStreamIter->msg_iter);
+    BT_ASSERT(!liveStreamIter->stream);
+    lttng_live_trace *trace = liveStreamIter->trace;
+    lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
+
+    auto tempMedium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+    const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
+    BT_ASSERT(ctfTc);
+    ctf::src::PktProps pktProps =
+        ctf::src::readPktProps(*ctfTc, std::move(tempMedium), 0_bytes, liveStreamIter->logger);
+
+    bt2::OptionalBorrowedObject<bt2::TraceClass> tc = ctfTc->libCls();
+    BT_ASSERT(tc);
+    BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
+    BT_ASSERT(trace->trace);
+
+    auto sc = tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
+    if (!sc) {
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter->logger, bt2::Error,
+                                               "No stream class with id {}",
+                                               liveStreamIter->ctf_stream_class_id.value);
+    }
+
+    bt_stream *streamPtr;
+    if (pktProps.dataStreamId) {
+        streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
+                                             *pktProps.dataStreamId);
+    } else {
+        /*
+         * No stream instance ID in the stream. It's possible
+         * to encounter this situation with older version of
+         * LTTng. In these cases, use the viewer_stream_id that
+         * is unique for a live viewer session.
+         */
+        streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
+                                             liveStreamIter->viewer_stream_id);
+    }
+    BT_ASSERT(streamPtr);
+    liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
+    liveStreamIter->stream->name(liveStreamIter->name);
+
+    auto medium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+    liveStreamIter->msg_iter.emplace(bt2::wrap(liveMsgIter->self_msg_iter), *ctfTc,
+                                     liveStreamIter->trace->metadata->metadataStreamUuid(),
+                                     *liveStreamIter->stream, std::move(medium),
+                                     ctf::src::MsgIterQuirks {}, liveStreamIter->logger);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
+}
 
 enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
                                                          bt_self_message_iterator *self_msg_iter)
 {
-    struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
-
     if (!session->lazy_stream_msg_init) {
         return LTTNG_LIVE_ITERATOR_STATUS_OK;
     }
@@ -132,26 +162,16 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi
 
     for (lttng_live_trace::UP& trace : session->traces) {
         for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
-            struct ctf_trace_class *ctf_tc;
-
             if (stream_iter->msg_iter) {
                 continue;
             }
 
-            ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
-            BT_CPPLOGD_SPEC(stream_iter->logger,
+            const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
+            BT_CPPLOGD_SPEC(session->logger,
                             "Creating CTF message iterator: session-id={}, ctf-tc-addr={}, "
                             "stream-iter-name={}, self-msg-iter-addr={}",
-                            session->id, fmt::ptr(ctf_tc), stream_iter->name,
+                            session->id, fmt::ptr(ctfTraceCls), stream_iter->name.c_str(),
                             fmt::ptr(self_msg_iter));
-            stream_iter->msg_iter =
-                ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(),
-                                    self_msg_iter, stream_iter->logger);
-            if (!stream_iter->msg_iter) {
-                BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger,
-                                             "Failed to create CTF message iterator");
-                return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-            }
         }
     }
 
@@ -162,7 +182,7 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi
 
 struct lttng_live_stream_iterator *
 lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
-                                  uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
+                                  uint64_t stream_id)
 {
     std::stringstream nameSs;
 
@@ -170,9 +190,7 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
     BT_ASSERT(session->lttng_live_msg_iter);
     BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
 
-    lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
-    lttng_live_trace *trace =
-        lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
+    const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
     if (!trace) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow CTF trace.");
         return nullptr;
@@ -190,21 +208,6 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
     stream_iter->last_inactivity_ts.is_set = false;
     stream_iter->last_inactivity_ts.value = 0;
 
-    if (trace->trace) {
-        struct ctf_trace_class *ctf_tc =
-            ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
-        BT_ASSERT(!stream_iter->msg_iter);
-        stream_iter->msg_iter =
-            ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, stream_iter.get(),
-                                self_msg_iter, stream_iter->logger);
-        if (!stream_iter->msg_iter) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(stream_iter->logger,
-                                         "Failed to create CTF message iterator");
-            return nullptr;
-        }
-    }
-    stream_iter->buf.resize(lttng_live->max_query_size);
-
     nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
     stream_iter->name = nameSs.str();
 
@@ -217,6 +220,18 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
     return ret;
 }
 
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+                                                 uint64_t ctfStreamClsId)
+{
+    if (streamIter->ctf_stream_class_id.is_set) {
+        BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
+        return;
+    } else {
+        streamIter->ctf_stream_class_id.value = ctfStreamClsId;
+        streamIter->ctf_stream_class_id.is_set = true;
+    }
+}
+
 lttng_live_stream_iterator::~lttng_live_stream_iterator()
 {
     /* Track the number of active stream iterator. */
index 9ad85bba60b95a8175047c8375d0fad86ae34de4..702454e77981a739f9143e22fce675249f958e5b 100644 (file)
@@ -16,6 +16,35 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi
 
 struct lttng_live_stream_iterator *
 lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
-                                  uint64_t stream_id, bt_self_message_iterator *self_msg_iter);
+                                  uint64_t stream_id);
+
+namespace ctf {
+namespace src {
+namespace live {
+
+struct CtfLiveMedium : Medium
+{
+    CtfLiveMedium(lttng_live_stream_iterator& liveStreamIter) :
+        _mLogger {liveStreamIter.logger, "PLUGIN/SRC.CTF.LTTNG-LIVE/CTF-LIVE-MEDIUM"},
+        _mLiveStreamIter(liveStreamIter)
+    {
+    }
+
+    Buf buf(bt2c::DataLen offset, bt2c::DataLen minSize) override;
+
+private:
+    bt2c::Logger _mLogger;
+    lttng_live_stream_iterator& _mLiveStreamIter;
+
+    bt2c::DataLen _mCurPktBegOffsetInStream = bt2c::DataLen::fromBits(0);
+    std::vector<uint8_t> _mBuf;
+};
+
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter);
 
 #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_DATA_STREAM_HPP */
index 6368181000f641718cb9c192cda053ba131b2341..a1105828982fde3951df9c62fcd304e7e3775235 100644 (file)
@@ -12,6 +12,7 @@
 #include <unistd.h>
 
 #include "common/assert.h"
+#include "cpp-common/bt2/wrap.hpp"
 #include "cpp-common/bt2c/fmt.hpp"
 #include "cpp-common/bt2c/glib-up.hpp"
 #include "cpp-common/bt2c/vector.hpp"
@@ -113,9 +114,9 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint
                     "session-id={}, hostname=\"{}\", session-name=\"{}\"",
                     session_id, hostname, session_name);
 
-    auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger);
+    auto session = bt2s::make_unique<lttng_live_session>(lttng_live_msg_iter->logger,
+                                                         lttng_live_msg_iter->selfComp);
 
-    session->self_comp = lttng_live_msg_iter->self_comp;
     session->id = session_id;
     session->lttng_live_msg_iter = lttng_live_msg_iter;
     session->new_streams_needed = true;
@@ -247,16 +248,17 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
         goto end;
     }
 
-    lttng_live_stream->base_offset = index.offset;
-    lttng_live_stream->offset = index.offset;
-    lttng_live_stream->len = index.packet_size / CHAR_BIT;
+    lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo {
+        bt2c::DataLen::fromBytes(index.offset),
+        bt2c::DataLen::fromBits(index.packet_size),
+    });
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                     "Setting live stream reading info: stream-name=\"{}\", "
-                    "viewer-stream-id={}, stream-base-offset={}, stream-offset={}, stream-len={}",
+                    "viewer-stream-id={}, stream-offset-in-relay={}, stream-len-bytes={}",
                     lttng_live_stream->name, lttng_live_stream->viewer_stream_id,
-                    lttng_live_stream->base_offset, lttng_live_stream->offset,
-                    lttng_live_stream->len);
+                    lttng_live_stream->curPktInfo->offsetInRelay.bytes(),
+                    lttng_live_stream->curPktInfo->len.bytes());
 
 end:
     if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
@@ -281,8 +283,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
     if (!session->attached) {
         BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger, "Attach to session: session-id={}",
                         session->id);
-        enum lttng_live_viewer_status attach_status =
-            lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter);
+        lttng_live_viewer_status attach_status = lttng_live_session_attach(session);
         if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
             if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                 /*
@@ -305,7 +306,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     "Updating all data streams: session-id={}, session-name=\"{}\"", session->id,
                     session->session_name);
 
-    status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter);
+    status = lttng_live_session_get_new_streams(session);
     switch (status) {
     case LTTNG_LIVE_ITERATOR_STATUS_OK:
         break;
@@ -341,6 +342,7 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
         status = lttng_live_metadata_update(trace.get());
         switch (status) {
         case LTTNG_LIVE_ITERATOR_STATUS_END:
+            break;
         case LTTNG_LIVE_ITERATOR_STATUS_OK:
             break;
         case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
@@ -470,7 +472,8 @@ emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     timestamp);
 
     const auto msg = bt_message_message_iterator_inactivity_create(
-        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp);
+        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class->libObjPtr(),
+        timestamp);
 
     if (!msg) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
@@ -516,63 +519,53 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies
 }
 
 static int live_get_msg_ts_ns(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                              const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
+                              bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
 {
-    const bt_clock_snapshot *clock_snapshot = NULL;
-    int ret = 0;
+    bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> clockSnapshot;
 
-    BT_ASSERT_DBG(msg);
     BT_ASSERT_DBG(ts_ns);
 
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Getting message's timestamp: iter-data-addr={}, msg-addr={}, "
-                    "last-msg-ts={}",
-                    fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns);
+                    "Getting message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}",
+                    fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns);
 
-    switch (bt_message_get_type(msg)) {
-    case BT_MESSAGE_TYPE_EVENT:
-        clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
+    switch (msg.type()) {
+    case bt2::MessageType::Event:
+        clockSnapshot = msg.asEvent().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_PACKET_BEGINNING:
-        clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
+    case bt2::MessageType::PacketBeginning:
+
+        clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_PACKET_END:
-        clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
+    case bt2::MessageType::PacketEnd:
+        clockSnapshot = msg.asPacketEnd().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
-        clock_snapshot =
-            bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
+    case bt2::MessageType::DiscardedEvents:
+        clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
-        clock_snapshot =
-            bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
+    case bt2::MessageType::DiscardedPackets:
+        clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
-        clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
+    case bt2::MessageType::MessageIteratorInactivity:
+        clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot();
         break;
     default:
         /* All the other messages have a higher priority */
         BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                         "Message has no timestamp, using the last message timestamp: "
                         "iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
-                        fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
+                        fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns,
+                        *ts_ns);
         *ts_ns = last_msg_ts_ns;
         return 0;
     }
 
-    ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
-    if (ret) {
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
-                                     "Cannot get nanoseconds from Epoch of clock snapshot: "
-                                     "clock-snapshot-addr={}",
-                                     fmt::ptr(clock_snapshot));
-        return -1;
-    }
+    *ts_ns = clockSnapshot->nsFromOrigin();
 
     BT_CPPLOGD_SPEC(
         lttng_live_msg_iter->logger,
         "Found message's timestamp: iter-data-addr={}, msg-addr={}, last-msg-ts={}, ts={}",
-        fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg), last_msg_ts_ns, *ts_ns);
+        fmt::ptr(lttng_live_msg_iter), fmt::ptr(msg.libObjPtr()), last_msg_ts_ns, *ts_ns);
 
     return 0;
 }
@@ -581,15 +574,14 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
     struct lttng_live_msg_iter *lttng_live_msg_iter,
     struct lttng_live_stream_iterator *lttng_live_stream, bt2::ConstMessage::Shared& message)
 {
-    enum ctf_msg_iter_status status;
-
     for (const auto& session : lttng_live_msg_iter->sessions) {
         if (session->new_streams_needed) {
             BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                             "Need an update for streams: session-id={}", session->id);
             return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
         }
-        for (lttng_live_trace::UP& trace : session->traces) {
+
+        for (const auto& trace : session->traces) {
             if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) {
                 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
                                 "Need an update for metadata stream: session-id={}, trace-id={}",
@@ -606,27 +598,28 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    const bt_message *msg;
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
-    switch (status) {
-    case CTF_MSG_ITER_STATUS_EOF:
-        return LTTNG_LIVE_ITERATOR_STATUS_END;
-    case CTF_MSG_ITER_STATUS_OK:
-        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
-        return LTTNG_LIVE_ITERATOR_STATUS_OK;
-    case CTF_MSG_ITER_STATUS_AGAIN:
-        /*
-         * Continue immediately (end of packet). The next
-         * get_index may return AGAIN to delay the following
-         * attempt.
-         */
-        return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
-    case CTF_MSG_ITER_STATUS_ERROR:
-    default:
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(
-            lttng_live_msg_iter->logger,
-            "CTF message iterator failed to get next message: msg-iter={}, msg-iter-status={}",
-            fmt::ptr(lttng_live_stream->msg_iter), status);
+    if (!lttng_live_stream->msg_iter) {
+        /* The first time we're called for this stream, the MsgIter is not instantiated.  */
+        enum lttng_live_iterator_status ret =
+            lttng_live_stream_iterator_create_msg_iter(lttng_live_stream);
+        if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+            return ret;
+        }
+    }
+
+    try {
+        message = lttng_live_stream->msg_iter->next();
+        if (message) {
+            return LTTNG_LIVE_ITERATOR_STATUS_OK;
+        } else {
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
+    } catch (const bt2c::TryAgain&) {
+        return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+    } catch (const bt2::Error&) {
+        BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
+                                     "CTF message iterator failed to get next message: msg-iter={}",
+                                     fmt::ptr(&*lttng_live_stream->msg_iter));
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 }
@@ -646,25 +639,26 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
-    const bt_message *msg;
-    enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
+    try {
+        if (!stream_iter->msg_iter) {
+            BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+                            "Reached the end of the live stream iterator.");
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
+
+        curr_msg = stream_iter->msg_iter->next();
+        if (!curr_msg) {
+            BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
+                            "Reached the end of the live stream iterator.");
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
 
-    if (status == CTF_MSG_ITER_STATUS_ERROR) {
+        return LTTNG_LIVE_ITERATOR_STATUS_OK;
+    } catch (const bt2::Error&) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                      "Error getting the next message from CTF message iterator");
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    } else if (status == CTF_MSG_ITER_STATUS_EOF) {
-        BT_CPPLOGI_SPEC(lttng_live_msg_iter->logger,
-                        "Reached the end of the live stream iterator.");
-        return LTTNG_LIVE_ITERATOR_STATUS_END;
     }
-
-    BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
-
-    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
-
-    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 /*
@@ -802,58 +796,42 @@ static bool is_discarded_packet_or_event_message(const bt2::ConstMessage msg)
 }
 
 static enum lttng_live_iterator_status
-adjust_discarded_packets_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                 const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
-                                 uint64_t new_begin_ts)
+adjust_discarded_packets_message(bt_self_message_iterator *iter, bt2::Stream stream,
+                                 bt2::ConstDiscardedPacketsMessage msgIn,
+                                 bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
 {
-    enum bt_property_availability availability;
-    const bt_clock_snapshot *clock_snapshot;
-    uint64_t end_ts;
-    uint64_t count;
-
-    clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
-    end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
-    availability = bt_message_discarded_packets_get_count(msg_in, &count);
-    BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+    BT_ASSERT_DBG(msgIn.count());
 
     const auto msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
-        iter, stream, new_begin_ts, end_ts);
+        iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
 
     if (!msg) {
         return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_packets_set_count(msg, count);
-    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    bt_message_discarded_packets_set_count(msg, *msgIn.count());
+    msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
     return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 static enum lttng_live_iterator_status
-adjust_discarded_events_message(bt_self_message_iterator *iter, const bt_stream *stream,
-                                const bt_message *msg_in, bt2::ConstMessage::Shared& msg_out,
-                                uint64_t new_begin_ts)
+adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream,
+                                bt2::ConstDiscardedEventsMessage msgIn,
+                                bt2::ConstMessage::Shared& msgOut, uint64_t new_begin_ts)
 {
-    enum bt_property_availability availability;
-    const bt_clock_snapshot *clock_snapshot;
-    uint64_t end_ts;
-    uint64_t count;
-
-    clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
-    end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
-    availability = bt_message_discarded_events_get_count(msg_in, &count);
-    BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+    BT_ASSERT_DBG(msgIn.count());
 
     const auto msg = bt_message_discarded_events_create_with_default_clock_snapshots(
-        iter, stream, new_begin_ts, end_ts);
+        iter, stream.libObjPtr(), new_begin_ts, msgIn.endDefaultClockSnapshot().value());
 
     if (!msg) {
         return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_events_set_count(msg, count);
-    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    bt_message_discarded_events_set_count(msg, *msgIn.count());
+    msgOut = bt2::Message::Shared::createWithoutRef(msg);
+
     return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
@@ -862,13 +840,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     struct lttng_live_stream_iterator *stream_iter, int64_t late_msg_ts_ns,
                     const bt2::ConstMessage& late_msg)
 {
-    const bt_clock_class *clock_class;
-    const bt_stream_class *stream_class;
-    enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
-    int64_t last_inactivity_ts_ns;
-    enum lttng_live_iterator_status adjust_status;
-    bt2::ConstMessage::Shared adjusted_message;
-
     /*
      * The timestamp of the current message is before the last message sent
      * by this component. We CANNOT send it as is.
@@ -920,18 +891,11 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    stream_class = bt_stream_borrow_class_const(stream_iter->stream->libObjPtr());
-    clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
-
-    ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
-        clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
-    if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
-                                     "Error converting last "
-                                     "inactivity message timestamp to nanoseconds");
-        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    }
+    const auto streamClass = stream_iter->stream->cls();
+    const auto clockClass = streamClass.defaultClockClass();
 
+    int64_t last_inactivity_ts_ns =
+        clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value);
     if (last_inactivity_ts_ns <= late_msg_ts_ns) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(lttng_live_msg_iter->logger,
                                      "Invalid live stream state: "
@@ -947,30 +911,36 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
      * adjust its timestamp to ensure monotonicity.
      */
     BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
-                    "Adjusting the timestamp of late message: late-msg-type={}, "
-                    "msg-new-ts-ns={}",
+                    "Adjusting the timestamp of late message: late-msg-type={}, msg-new-ts-ns={}",
                     late_msg.type(), stream_iter->last_inactivity_ts.value);
-    switch (late_msg.type()) {
-    case bt2::MessageType::DiscardedEvents:
-        adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
-            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
-        break;
-    case bt2::MessageType::DiscardedPackets:
-        adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, stream_iter->stream->libObjPtr(),
-            late_msg.libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
-        break;
-    default:
-        bt_common_abort();
-    }
+
+    bt2::ConstMessage::Shared adjustedMessage;
+
+    const auto adjust_status = bt2c::call([&] {
+        switch (late_msg.type()) {
+        case bt2::MessageType::DiscardedEvents:
+            return adjust_discarded_events_message(lttng_live_msg_iter->self_msg_iter,
+                                                   *stream_iter->stream,
+                                                   late_msg.asDiscardedEvents(), adjustedMessage,
+                                                   stream_iter->last_inactivity_ts.value);
+
+        case bt2::MessageType::DiscardedPackets:
+            return adjust_discarded_packets_message(lttng_live_msg_iter->self_msg_iter,
+                                                    *stream_iter->stream,
+                                                    late_msg.asDiscardedPackets(), adjustedMessage,
+                                                    stream_iter->last_inactivity_ts.value);
+
+        default:
+            bt_common_abort();
+        }
+    });
 
     if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
         return adjust_status;
     }
 
-    BT_ASSERT_DBG(adjusted_message);
-    stream_iter->current_msg = adjusted_message;
+    BT_ASSERT_DBG(adjustedMessage);
+    stream_iter->current_msg = std::move(adjustedMessage);
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
 
     return LTTNG_LIVE_ITERATOR_STATUS_OK;
@@ -1034,8 +1004,8 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
              * Get the timestamp in nanoseconds from origin of this
              * message.
              */
-            live_get_msg_ts_ns(lttng_live_msg_iter, msg->libObjPtr(),
-                               lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
+            live_get_msg_ts_ns(lttng_live_msg_iter, *msg, lttng_live_msg_iter->last_msg_ts_ns,
+                               &curr_msg_ts_ns);
 
             /*
              * Check if the message of the current live stream
@@ -1500,9 +1470,9 @@ static lttng_live_msg_iter::UP
 lttng_live_msg_iter_create(struct lttng_live_component *lttng_live_comp,
                            bt_self_message_iterator *self_msg_it)
 {
-    auto msg_iter = bt2s::make_unique<struct lttng_live_msg_iter>(lttng_live_comp->logger);
+    auto msg_iter = bt2s::make_unique<struct lttng_live_msg_iter>(lttng_live_comp->logger,
+                                                                  lttng_live_comp->selfComp);
 
-    msg_iter->self_comp = lttng_live_comp->self_comp;
     msg_iter->lttng_live_comp = lttng_live_comp;
     msg_iter->self_msg_iter = self_msg_it;
     msg_iter->active_stream_iter = 0;
@@ -1783,7 +1753,7 @@ lttng_live_component_create(const bt_value *params, bt_self_component_source *se
     const bt_value *value;
     enum bt_param_validation_status validation_status;
     gchar *validation_error = NULL;
-    bt2c::Logger logger {bt2::SelfSourceComponent {self_comp}, "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
+    bt2c::Logger logger {bt2::wrap(self_comp), "PLUGIN/SRC.CTF.LTTNG-LIVE/COMP"};
 
     validation_status = bt_param_validation_validate(params, params_descr, &validation_error);
     if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
@@ -1794,8 +1764,9 @@ lttng_live_component_create(const bt_value *params, bt_self_component_source *se
         return BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
     }
 
-    auto lttng_live = bt2s::make_unique<lttng_live_component>(std::move(logger));
-    lttng_live->self_comp = bt_self_component_source_as_self_component(self_comp);
+    auto lttng_live =
+        bt2s::make_unique<lttng_live_component>(std::move(logger), bt2::wrap(self_comp));
+
     lttng_live->max_query_size = MAX_QUERY_SIZE;
     lttng_live->has_msg_iter = false;
 
index be2689ce944d8e0efbfc9abc1a207afb95eb2069..19df6353a446a803aa92b3823ef9522533aeae72 100644 (file)
@@ -19,8 +19,8 @@
 #include "cpp-common/bt2/message.hpp"
 #include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */
 
-#include "../common/src/metadata/tsdl/decoder.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/metadata/metadata-stream-parser-utils.hpp"
+#include "../common/src/msg-iter.hpp"
 #include "viewer-connection.hpp"
 
 /*
@@ -113,7 +113,7 @@ struct lttng_live_stream_iterator
      * Since only a single iterator per viewer connection, we have
      * only a single message iterator per stream.
      */
-    ctf_msg_iter_up msg_iter;
+    bt2s::optional<ctf::src::MsgIter> msg_iter;
 
     uint64_t viewer_stream_id = 0;
 
@@ -123,13 +123,6 @@ struct lttng_live_stream_iterator
         uint64_t value = 0;
     } ctf_stream_class_id;
 
-    /* base offset in current index. */
-    uint64_t base_offset = 0;
-    /* len to read in current index. */
-    uint64_t len = 0;
-    /* offset in current index. */
-    uint64_t offset = 0;
-
     /*
      * Clock Snapshot value of the last message iterator inactivity message
      * sent downstream.
@@ -154,28 +147,63 @@ struct lttng_live_stream_iterator
     /* Timestamp in nanoseconds of the current message (current_msg). */
     int64_t current_msg_ts_ns = 0;
 
-    std::vector<uint8_t> buf;
-
     std::string name;
 
     bool has_stream_hung_up = false;
+
+    struct CurPktInfo
+    {
+        bt2c::DataLen offsetInRelay;
+        bt2c::DataLen len;
+    };
+
+    bt2s::optional<CurPktInfo> curPktInfo;
 };
 
 struct lttng_live_metadata
 {
     using UP = std::unique_ptr<lttng_live_metadata>;
 
-    explicit lttng_live_metadata(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"}
+    explicit lttng_live_metadata(const bt2::SelfComponent selfComp,
+                                 const bt2c::Logger& parentLogger) :
+        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/METADATA"},
+        _mSelfComp {selfComp}
+
     {
     }
 
+    const ctf::src::TraceCls *traceCls() const
+    {
+        return _mMetadataStreamParser->traceCls();
+    }
+
+    const bt2s::optional<bt2c::Uuid>& metadataStreamUuid() const noexcept
+    {
+        return _mMetadataStreamParser->metadataStreamUuid();
+    }
+
+    void parseSection(const bt2c::ConstBytes data)
+    {
+        if (!_mMetadataStreamParser) {
+            _mMetadataStreamParser =
+                ctf::src::createMetadataStreamParser(data, _mSelfComp, {}, logger);
+        }
+
+        _mMetadataStreamParser->parseSection(data);
+    }
+
+    bt2::SelfComponent selfComp() const noexcept
+    {
+        return _mSelfComp;
+    }
+
     bt2c::Logger logger;
 
     uint64_t stream_id = 0;
 
-    /* Weak reference. */
-    ctf_metadata_decoder_up decoder;
+private:
+    bt2::SelfComponent _mSelfComp;
+    ctf::src::MetadataStreamParser::UP _mMetadataStreamParser;
 };
 
 enum lttng_live_metadata_stream_state
@@ -219,11 +247,9 @@ struct lttng_live_trace
 
     bt2::Trace::Shared trace;
 
-    bt2::TraceClass::Shared trace_class;
-
     lttng_live_metadata::UP metadata;
 
-    const bt_clock_class *clock_class = nullptr;
+    bt2::OptionalBorrowedObject<bt2::ConstClockClass> clock_class;
 
     std::vector<lttng_live_stream_iterator::UP> stream_iterators;
 
@@ -235,8 +261,10 @@ struct lttng_live_session
 {
     using UP = std::unique_ptr<lttng_live_session>;
 
-    explicit lttng_live_session(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"}
+    explicit lttng_live_session(const bt2c::Logger& parentLogger,
+                                const bt2::SelfComponent selfCompParam) :
+        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/SESSION"},
+        selfComp {selfCompParam}
     {
     }
 
@@ -244,7 +272,7 @@ struct lttng_live_session
 
     bt2c::Logger logger;
 
-    bt_self_component *self_comp = nullptr;
+    bt2::SelfComponent selfComp;
 
     /* Weak reference. */
     struct lttng_live_msg_iter *lttng_live_msg_iter = nullptr;
@@ -277,15 +305,16 @@ struct lttng_live_component
 {
     using UP = std::unique_ptr<lttng_live_component>;
 
-    explicit lttng_live_component(bt2c::Logger loggerParam) noexcept :
-        logger {std::move(loggerParam)}
+    explicit lttng_live_component(bt2c::Logger loggerParam,
+                                  const bt2::SelfComponent selfCompParam) noexcept :
+        logger {std::move(loggerParam)},
+        selfComp {selfCompParam}
     {
     }
 
     bt2c::Logger logger;
 
-    /* Weak reference. */
-    bt_self_component *self_comp = nullptr;
+    bt2::SelfComponent selfComp;
 
     struct
     {
@@ -306,8 +335,10 @@ struct lttng_live_msg_iter
 {
     using UP = std::unique_ptr<lttng_live_msg_iter>;
 
-    explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"}
+    explicit lttng_live_msg_iter(const bt2c::Logger& parentLogger,
+                                 const bt2::SelfComponent selfCompParam) :
+        logger {parentLogger, "PLUGIN/SRC.CTF.LTTNG-LIVE/MSG-ITER"},
+        selfComp {selfCompParam}
     {
     }
 
@@ -315,7 +346,7 @@ struct lttng_live_msg_iter
 
     bt2c::Logger logger;
 
-    bt_self_component *self_comp = nullptr;
+    bt2::SelfComponent selfComp;
 
     /* Weak reference. */
     struct lttng_live_component *lttng_live_comp = nullptr;
@@ -411,14 +442,12 @@ lttng_live_msg_iter_init(bt_self_message_iterator *self_msg_it,
 
 void lttng_live_msg_iter_finalize(bt_self_message_iterator *it);
 
-enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
-                                                        bt_self_message_iterator *self_msg_iter);
+enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session);
 
 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session);
 
 enum lttng_live_iterator_status
-lttng_live_session_get_new_streams(struct lttng_live_session *session,
-                                   bt_self_message_iterator *self_msg_iter);
+lttng_live_session_get_new_streams(struct lttng_live_session *session);
 
 struct lttng_live_trace *
 lttng_live_session_borrow_or_create_trace_by_id(struct lttng_live_session *session,
@@ -435,20 +464,18 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint
  * written to the file.
  */
 enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf);
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf);
 
 enum lttng_live_iterator_status
 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
                           struct lttng_live_stream_iterator *stream, struct packet_index *index);
 
-enum ctf_msg_iter_medium_status
-lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                            struct lttng_live_stream_iterator *stream, uint8_t *buf,
-                            uint64_t offset, uint64_t req_len, uint64_t *recv_len);
-
 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
 
 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
                                           enum lttng_live_stream_state new_state);
 
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+                                                 uint64_t ctfStreamClsId);
+
 #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_LTTNG_LIVE_HPP */
index 693042d7bf228be5acae70c22b82f85887c238f6..660843fb15ac398bcd22e5ed990584a0dca3067b 100644 (file)
@@ -6,12 +6,9 @@
  * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
  */
 
-#include "compat/memstream.h"
-#include "cpp-common/bt2c/libc-up.hpp"
 #include "cpp-common/bt2s/make-unique.hpp"
 
 #include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
 #include "lttng-live.hpp"
 #include "metadata.hpp"
 
@@ -31,25 +28,18 @@ struct packet_header
     uint8_t minor;
 } __attribute__((__packed__));
 
-static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
+static bool stream_classes_all_have_default_clock_class(bt2::ConstTraceClass tc,
                                                         const bt2c::Logger& logger)
 {
-    uint64_t i, sc_count;
-    const bt_clock_class *cc = NULL;
-    const bt_stream_class *sc;
+    for (std::uint64_t i = 0; i < tc.length(); ++i) {
+        auto sc = tc[i];
+        auto cc = sc.defaultClockClass();
 
-    sc_count = bt_trace_class_get_stream_class_count(tc);
-    for (i = 0; i < sc_count; i++) {
-        sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-
-        BT_ASSERT(sc);
-
-        cc = bt_stream_class_borrow_default_clock_class_const(sc);
         if (!cc) {
             BT_CPPLOGE_APPEND_CAUSE_SPEC(logger,
                                          "Stream class doesn't have a default clock class: "
                                          "sc-id={}, sc-name=\"{}\"",
-                                         bt_stream_class_get_id(sc), bt_stream_class_get_name(sc));
+                                         sc.id(), sc.name());
             return false;
         }
     }
@@ -61,34 +51,16 @@ static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
  * encountered. This is useful to create message iterator inactivity message as
  * we don't need a particular clock class.
  */
-static const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc)
+static bt2::ConstClockClass borrow_any_clock_class(bt2::ConstTraceClass tc)
 {
-    uint64_t i, sc_count;
-    const bt_clock_class *cc = NULL;
-    const bt_stream_class *sc;
-
-    sc_count = bt_trace_class_get_stream_class_count(tc);
-    for (i = 0; i < sc_count; i++) {
-        sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-        BT_ASSERT_DBG(sc);
-
-        cc = bt_stream_class_borrow_default_clock_class_const(sc);
-        if (cc) {
-            return cc;
-        }
-    }
-
-    bt_common_abort();
+    return *tc[0].defaultClockClass();
 }
 
 enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_trace *trace)
 {
     struct lttng_live_session *session = trace->session;
     struct lttng_live_metadata *metadata = trace->metadata.get();
-    std::vector<char> metadataBuf;
     bool keep_receiving;
-    bt2c::FileUP fp;
-    enum ctf_metadata_decoder_status decoder_status;
     enum lttng_live_get_one_metadata_status metadata_status;
 
     BT_CPPLOGD_SPEC(metadata->logger, "Updating metadata for trace: session-id={}, trace-id={}",
@@ -117,6 +89,7 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra
 
     keep_receiving = true;
     /* Grab all available metadata. */
+    std::vector<uint8_t> metadataBuf;
     while (keep_receiving) {
         /*
          * lttng_live_get_one_metadata_packet() asks the Relay Daemon
@@ -173,79 +146,49 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra
         return LTTNG_LIVE_ITERATOR_STATUS_OK;
     }
 
-    /*
-     * Open a new reading file handle on the `metadata_buf` and pass it to
-     * the metadata decoder.
-     */
-    fp.reset(bt_fmemopen(metadataBuf.data(), metadataBuf.size(), "rb"));
-    if (!fp) {
-        if (errno == EINTR && lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
-            session->lttng_live_msg_iter->was_interrupted = true;
-            return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-        } else {
-            BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(metadata->logger,
-                                               "Cannot memory-open metadata buffer", ".");
-            return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-        }
-    }
-
     /*
      * The call to ctf_metadata_decoder_append_content() will append
      * new metadata to our current trace class.
      */
     BT_CPPLOGD_SPEC(metadata->logger, "Appending new metadata to the ctf_trace class");
-    decoder_status = ctf_metadata_decoder_append_content(metadata->decoder.get(), fp.get());
-    switch (decoder_status) {
-    case CTF_METADATA_DECODER_STATUS_OK:
-        if (!trace->trace_class) {
-            struct ctf_trace_class *tc =
-                ctf_metadata_decoder_borrow_ctf_trace_class(metadata->decoder.get());
+    metadata->parseSection(metadataBuf);
+    if (!trace->trace) {
+        const ctf::src::TraceCls *ctfTraceCls = metadata->traceCls();
+        BT_ASSERT(ctfTraceCls);
+        bt2::OptionalBorrowedObject<bt2::TraceClass> irTraceCls = ctfTraceCls->libCls();
 
-            trace->trace_class = ctf_metadata_decoder_get_ir_trace_class(metadata->decoder.get());
-            trace->trace = trace->trace_class->instantiate();
-            if (!trace->trace) {
-                BT_CPPLOGE_APPEND_CAUSE_SPEC(metadata->logger, "Failed to create bt_trace");
-                return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-            }
+        if (irTraceCls) {
+            trace->trace = irTraceCls->instantiate();
 
-            ctf_trace_class_configure_ir_trace(tc, *trace->trace);
+            ctf_trace_class_configure_ir_trace(*ctfTraceCls, *trace->trace,
+                                               metadata->selfComp().graphMipVersion(),
+                                               metadata->logger);
 
-            if (!stream_classes_all_have_default_clock_class(trace->trace_class->libObjPtr(),
+            if (!stream_classes_all_have_default_clock_class(trace->trace->cls(),
                                                              metadata->logger)) {
                 /* Error logged in function. */
                 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
             }
-            trace->clock_class = borrow_any_clock_class(trace->trace_class->libObjPtr());
+
+            trace->clock_class = borrow_any_clock_class(trace->trace->cls());
         }
+    }
 
-        /* The metadata was updated successfully. */
-        trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
+    /* The metadata was updated successfully. */
+    trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
 
-        return LTTNG_LIVE_ITERATOR_STATUS_OK;
-    default:
-        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    }
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 int lttng_live_metadata_create_stream(struct lttng_live_session *session, uint64_t ctf_trace_id,
                                       uint64_t stream_id)
 {
-    struct lttng_live_trace *trace;
+    auto metadata = bt2s::make_unique<lttng_live_metadata>(session->selfComp, session->logger);
 
-    ctf_metadata_decoder_config cfg {session->logger};
-    cfg.self_comp = session->self_comp;
-    cfg.create_trace_class = true;
-
-    auto metadata = bt2s::make_unique<lttng_live_metadata>(session->logger);
     metadata->stream_id = stream_id;
 
-    metadata->decoder = ctf_metadata_decoder_create(&cfg);
-    if (!metadata->decoder) {
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to create CTF metadata decoder");
-        return -1;
-    }
+    const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
 
-    trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
     if (!trace) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow trace");
         return -1;
index b639b6edf576904e646e8100bdfd3f3f19b8a390..cf69f1bb60c3de74525cf9fa9eb2c2338eed41d7 100644 (file)
@@ -62,16 +62,16 @@ viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_statu
     bt_common_abort();
 }
 
-static inline enum ctf_msg_iter_medium_status
-viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
+static inline enum lttng_live_get_stream_bytes_status
+viewer_status_to_lttng_live_get_stream_bytes_status(enum lttng_live_viewer_status viewer_status)
 {
     switch (viewer_status) {
     case LTTNG_LIVE_VIEWER_STATUS_OK:
-        return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
     case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
     case LTTNG_LIVE_VIEWER_STATUS_ERROR:
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     bt_common_abort();
@@ -730,8 +730,7 @@ lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter
 }
 
 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
-                                                     uint32_t stream_count,
-                                                     bt_self_message_iterator *self_msg_iter)
+                                                     uint32_t stream_count)
 {
     uint32_t i;
     struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
@@ -767,8 +766,7 @@ static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *
         } else {
             BT_CPPLOGI_SPEC(viewer_connection->logger, "    stream {} : {}/{}", stream_id,
                             stream.path_name, stream.channel_name);
-            live_stream =
-                lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
+            live_stream = lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id);
             if (!live_stream) {
                 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream");
                 return LTTNG_LIVE_VIEWER_STATUS_ERROR;
@@ -779,8 +777,7 @@ static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *
     return LTTNG_LIVE_VIEWER_STATUS_OK;
 }
 
-enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
-                                                        bt_self_message_iterator *self_msg_iter)
+enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session)
 {
     struct lttng_viewer_cmd cmd;
     enum lttng_live_viewer_status status;
@@ -851,7 +848,7 @@ enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_sessio
     }
 
     /* We receive the initial list of streams. */
-    status = receive_streams(session, streams_count, self_msg_iter);
+    status = receive_streams(session, streams_count);
     switch (status) {
     case LTTNG_LIVE_VIEWER_STATUS_OK:
         break;
@@ -941,14 +938,13 @@ enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_sessio
 }
 
 enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf)
 {
     uint64_t len = 0;
     enum lttng_live_viewer_status viewer_status;
     struct lttng_viewer_cmd cmd;
     struct lttng_viewer_get_metadata rq;
     struct lttng_viewer_metadata_packet rp;
-    std::vector<char> data;
     struct lttng_live_session *session = trace->session;
     struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
     struct lttng_live_metadata *metadata = trace->metadata.get();
@@ -1025,21 +1021,15 @@ lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<c
         return LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
     }
 
-    BT_DIAG_PUSH
-    BT_DIAG_IGNORE_NULL_DEREFERENCE
-    data.resize(len);
-    BT_DIAG_POP
+    std::vector<char> localBuf(len);
 
-    viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
+    viewer_status = lttng_live_recv(viewer_connection, localBuf.data(), len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(viewer_status, "get metadata packet");
         return (lttng_live_get_one_metadata_status) viewer_status;
     }
 
-    /*
-     * Write the metadata to the file handle.
-     */
-    buf.insert(buf.end(), data.begin(), data.end());
+    buf.insert(buf.end(), localBuf.begin(), localBuf.end());
 
     return LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
 }
@@ -1197,7 +1187,7 @@ lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 }
 
-enum ctf_msg_iter_medium_status
+lttng_live_get_stream_bytes_status
 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
                             struct lttng_live_stream_iterator *stream, uint8_t *buf,
                             uint64_t offset, uint64_t req_len, uint64_t *recv_len)
@@ -1237,13 +1227,13 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
     viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_send_status(viewer_status, "get data packet command");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
 
     viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(viewer_status, "get data packet reply");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
 
     flags = be32toh(rp.flags);
@@ -1261,7 +1251,7 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
         break;
     case LTTNG_VIEWER_GET_PACKET_RETRY:
         /* Unimplemented by relay daemon */
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
     case LTTNG_VIEWER_GET_PACKET_ERR:
         if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
             BT_CPPLOGD_SPEC(viewer_connection->logger,
@@ -1281,39 +1271,38 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
             BT_CPPLOGD_SPEC(viewer_connection->logger,
                             "Reply with any one flags set means we should retry: response={}",
                             static_cast<lttng_viewer_get_packet_return_code>(rp_status));
-            return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+            return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
         }
         BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
                                      "Received get_data_packet response: error");
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     case LTTNG_VIEWER_GET_PACKET_EOF:
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF;
     default:
         BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
                                      "Received get_data_packet response: unknown ({})", rp_status);
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     if (req_len == 0) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(viewer_status, "get data packet");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
     *recv_len = req_len;
 
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+    return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
 }
 
 /*
  * Request new streams for a session.
  */
 enum lttng_live_iterator_status
-lttng_live_session_get_new_streams(struct lttng_live_session *session,
-                                   bt_self_message_iterator *self_msg_iter)
+lttng_live_session_get_new_streams(struct lttng_live_session *session)
 {
     struct lttng_viewer_cmd cmd;
     struct lttng_viewer_new_streams_request rq;
@@ -1383,7 +1372,7 @@ lttng_live_session_get_new_streams(struct lttng_live_session *session,
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    viewer_status = receive_streams(session, streams_count, self_msg_iter);
+    viewer_status = receive_streams(session, streams_count);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(viewer_status, "new streams");
         return viewer_status_to_live_iterator_status(viewer_status);
index 9610a15edcba85d1422bded2507e7f5dce673316..ebbb7467e105f37a13886a547f7c56cdcef214fc 100644 (file)
@@ -7,6 +7,7 @@
 #ifndef BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP
 #define BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP
 
+#include <memory>
 #include <string>
 
 #include <glib.h>
@@ -111,4 +112,17 @@ lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter
 bt2::Value::Shared
 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection);
 
+enum lttng_live_get_stream_bytes_status
+{
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK = __BT_FUNC_STATUS_OK,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF = __BT_FUNC_STATUS_END,
+};
+
+lttng_live_get_stream_bytes_status
+lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
+                            struct lttng_live_stream_iterator *stream, uint8_t *buf,
+                            uint64_t offset, uint64_t req_len, uint64_t *recv_len);
+
 #endif /* BABELTRACE_PLUGINS_CTF_LTTNG_LIVE_VIEWER_CONNECTION_HPP */
index e77f2711fc59bdbf645fa2b30d01b4903146a85a..274302fbe39e364b01f7faeab505fda690d3cb3f 100755 (executable)
@@ -307,14 +307,9 @@ test_compare_to_ctf_fs() {
        bt_remove_cr "${expected_stdout}"
        bt_remove_cr "${expected_stderr}"
 
-       # Hack. To be removed when src.ctf.lttng-live is updated to use the new
-       # IR generator.
-       "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}"
-       "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}"
-       "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}"
-
        run_test "$test_text" "$cli_args_template" "$expected_stdout" \
                "$expected_stderr" "$trace_dir_native" "${server_args[@]}"
+
        diag "Inverse session order from lttng-relayd"
        run_test "$test_text" "$cli_args_template" "$expected_stdout" \
                "$expected_stderr" "$trace_dir_native" "${server_args_inverse[@]}"
This page took 0.045169 seconds and 4 git commands to generate.