src.ctf.lttng-live: use the new metadata stream parser and message iterator
authorSimon Marchi <simon.marchi@efficios.com>
Thu, 4 Aug 2022 20:30:53 +0000 (16:30 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Tue, 23 Aug 2022 16:06:16 +0000 (12:06 -0400)
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Change-Id: Id2e11205bed54654942077c5336495b7bdd3f38d
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8617
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
src/plugins/ctf/lttng-live/data-stream.cpp
src/plugins/ctf/lttng-live/data-stream.hpp
src/plugins/ctf/lttng-live/lttng-live.cpp
src/plugins/ctf/lttng-live/lttng-live.hpp
src/plugins/ctf/lttng-live/metadata.cpp
src/plugins/ctf/lttng-live/viewer-connection.cpp
src/plugins/ctf/lttng-live/viewer-connection.hpp
tests/data/plugins/src.ctf.lttng-live/cli-base.expect
tests/data/plugins/src.ctf.lttng-live/cli-multi-domains.expect
tests/data/plugins/src.ctf.lttng-live/split_metadata.expect
tests/plugins/src.ctf.lttng-live/test_live

index 8f9f2c76f8ee2c9701ca0cd49011ddc5cf797b00..21e75f190babbb905160c87e5c29c10f9b263aed 100644 (file)
 
 #include <babeltrace2/babeltrace.h>
 
-#include "cpp-common/cfg-logging.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
 #include "cpp-common/make-unique.hpp"
 #include "common/assert.h"
 #include "compat/mman.h"
+#include "../common/src/pkt-props.hpp"
 #include "cpp-common/make-unique.hpp"
 #include "data-stream.hpp"
+#include "cpp-common/exc.hpp"
 #include "cpp-common/cfg-logging-error-reporting.hpp"
+#include "cpp-common/cfg-logging-error-reporting-throw.hpp"
 
 #define STREAM_NAME_PREFIX "stream-"
 
-static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
-                                                           size_t *buffer_sz, void *data)
+namespace ctf {
+namespace src {
+namespace live {
+
+Buf CtfLiveMedium::buf(bt2_common::DataLen requestedOffsetInStream, bt2_common::DataLen minSize)
 {
-    lttng_live_stream_iterator *stream = (lttng_live_stream_iterator *) data;
-    struct lttng_live_trace *trace = stream->trace;
-    struct lttng_live_session *session = trace->session;
-    struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter;
-    uint64_t recv_len = 0;
-    uint64_t len_left;
-    uint64_t read_len;
-
-    BT_ASSERT(request_sz);
-
-    if (stream->has_stream_hung_up) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+    const bt2_common::LogCfg& logCfg = _mLiveStreamIter.logCfg;
+    BT_CLOGD("CtfLiveMedium::buf called: stream-id=%" PRId64
+             ", offset-bytes=%llu, min-size-bytes=%llu",
+             _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1,
+             requestedOffsetInStream.bytes(), minSize.bytes());
+
+    if (_mLiveStreamIter.has_stream_hung_up)
+        throw NoData {};
+
+    BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
+    bt2_common::DataLen requestedOffsetInPacket =
+        requestedOffsetInStream - _mCurPktBegOffsetInStream;
+
+    BT_ASSERT(_mLiveStreamIter.curPktInfo);
+
+    if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
+        _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
+        _mLiveStreamIter.curPktInfo.reset();
+        lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
+        throw bt2_common::TryAgain {};
     }
 
-    len_left = stream->base_offset + stream->len - stream->offset;
-    if (!len_left) {
-        lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+    bt2_common::DataLen requestedOffsetInRelay =
+        _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
+    bt2_common::DataLen lenUntilEndOfPacket =
+        _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
+
+    bt2_common::DataLen maxReqLen = bt2_common::DataLen::fromBytes(
+        _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
+    bt2_common::DataLen reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
+    uint64_t recvLen;
+
+    _mBuf.resize(reqLen.bytes());
+
+    lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
+        _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
+        requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
+    switch (status) {
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
+        _mBuf.resize(recvLen);
+        break;
+
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
+        BT_CLOGD("CtfLiveMedium::buf try again");
+        throw bt2_common::TryAgain();
+
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
+        BT_CLOGD("CtfLiveMedium::buf eof");
+        throw NoData();
+
+    case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
+        BT_CLOGD("CtfLiveMedium::buf error");
+        throw bt2_common::Error();
     }
 
-    read_len = MIN(request_sz, stream->buf.size());
-    read_len = MIN(read_len, len_left);
-    ctf_msg_iter_medium_status status = lttng_live_get_stream_bytes(
-        live_msg_iter, stream, stream->buf.data(), stream->offset, read_len, &recv_len);
-    if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
-        return status;
-    }
+    Buf buf {_mBuf.data(), bt2_common::DataLen::fromBytes(_mBuf.size())};
 
-    *buffer_addr = stream->buf.data();
-    *buffer_sz = recv_len;
-    stream->offset += recv_len;
+    BT_CLOGD("CtfLiveMedium::buf returns: stream-id=%" PRId64 ", buf-addr=%p, buf-size-bytes=%llu",
+             _mLiveStreamIter.stream ? (*_mLiveStreamIter.stream)->id() : -1, buf.addr(),
+             buf.size().bytes());
 
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+    return buf;
 }
 
-static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id, void *data)
-{
-    lttng_live_stream_iterator *lttng_live_stream = (lttng_live_stream_iterator *) data;
-    const bt2_common::LogCfg& logCfg = lttng_live_stream->logCfg;
-
-    if (!lttng_live_stream->stream) {
-        uint64_t stream_class_id = bt_stream_class_get_id(stream_class);
-
-        BT_CLOGI("Creating stream %s (ID: %" PRIu64 ") out of stream "
-                 "class %" PRId64,
-                 lttng_live_stream->name.c_str(), stream_id, stream_class_id);
-
-        bt_stream *stream;
-
-        if (stream_id < 0) {
-            /*
-             * No stream instance ID in the stream. It's possible
-             * to encounter this situation with older version of
-             * LTTng. In these cases, use the viewer_stream_id that
-             * is unique for a live viewer session.
-             */
-            stream = bt_stream_create_with_id(stream_class,
-                                              (*lttng_live_stream->trace->trace)->libObjPtr(),
-                                              lttng_live_stream->viewer_stream_id);
-        } else {
-            stream = bt_stream_create_with_id(stream_class,
-                                              (*lttng_live_stream->trace->trace)->libObjPtr(),
-                                              (uint64_t) stream_id);
-        }
-
-        if (!stream) {
-            BT_CLOGE_APPEND_CAUSE("Cannot create stream %s (stream class ID "
-                                  "%" PRId64 ", stream ID %" PRIu64 ")",
-                                  lttng_live_stream->name.c_str(), stream_class_id, stream_id);
-            return nullptr;
-        }
-
-        lttng_live_stream->stream = bt2::Stream::Shared::createWithoutRef(stream);
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
 
-        (*lttng_live_stream->stream)->name(lttng_live_stream->name);
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
+{
+    BT_ASSERT(!liveStreamIter->msg_iter);
+    BT_ASSERT(!liveStreamIter->stream);
+    lttng_live_trace *trace = liveStreamIter->trace;
+    lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
+
+    ctf::src::Medium::UP tempMedium =
+        bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+    const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
+    BT_ASSERT(ctfTc);
+    ctf::src::PktProps pktProps = ctf::src::readPktProps(
+        *ctfTc, std::move(tempMedium), bt2_common::DataLen::fromBytes(0), liveStreamIter->logCfg);
+
+    nonstd::optional<bt2::TraceClass> tc = ctfTc->libCls();
+    BT_ASSERT(tc);
+    BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
+    BT_ASSERT(trace->trace);
+
+    const bt2_common::LogCfg& logCfg = liveStreamIter->logCfg;
+    nonstd::optional<bt2::StreamClass> sc =
+        tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
+    if (!sc) {
+        BT_CLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "No stream class with id %" PRId64,
+                                        liveStreamIter->ctf_stream_class_id.value);
     }
 
-    return (*lttng_live_stream->stream)->libObjPtr();
+    // FIXME: in the original, there is a fall back if the data stream id is not available.
+    bt_stream *streamPtr = bt_stream_create_with_id(sc->libObjPtr(), (*trace->trace)->libObjPtr(),
+                                                    *pktProps.dataStreamId);
+    BT_ASSERT(streamPtr);
+    liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
+    (*liveStreamIter->stream)->name(liveStreamIter->name);
+
+    ctf::src::Medium::UP medium =
+        bt2_common::makeUnique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
+    liveStreamIter->msg_iter.emplace(
+        liveMsgIter->self_msg_iter, *ctfTc, liveStreamIter->trace->metadata->metadataStreamUuid(),
+        **liveStreamIter->stream, std::move(medium), ctf::src::MsgIterQuirks {}, logCfg);
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
-static struct ctf_msg_iter_medium_ops medops = {
-    medop_request_bytes,
-    nullptr,
-    nullptr,
-    medop_borrow_stream,
-};
-
 BT_HIDDEN
 enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
                                                          bt_self_message_iterator *self_msg_iter)
 {
-    struct lttng_live_component *lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
     const bt2_common::LogCfg& logCfg = session->logCfg;
 
     if (!session->lazy_stream_msg_init) {
@@ -137,23 +164,15 @@ enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_sessi
 
     for (lttng_live_trace::UP& trace : session->traces) {
         for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
-            struct ctf_trace_class *ctf_tc;
-
             if (stream_iter->msg_iter) {
                 continue;
             }
 
-            ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
+            const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
             BT_CLOGD("Creating CTF message iterator: "
                      "session-id=%" PRIu64 ", ctf-tc-addr=%p, "
                      "stream-iter-name=%s, self-msg-iter-addr=%p",
-                     session->id, ctf_tc, stream_iter->name.c_str(), self_msg_iter);
-            stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
-                                                        stream_iter.get(), self_msg_iter, logCfg);
-            if (!stream_iter->msg_iter) {
-                BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
-                return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-            }
+                     session->id, ctfTraceCls, stream_iter->name.c_str(), self_msg_iter);
         }
     }
 
@@ -167,7 +186,6 @@ struct lttng_live_stream_iterator *
 lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
                                   uint64_t stream_id, bt_self_message_iterator *self_msg_iter)
 {
-    struct lttng_live_component *lttng_live;
     struct lttng_live_trace *trace;
     std::stringstream nameSs;
 
@@ -177,8 +195,6 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
 
     const bt2_common::LogCfg& logCfg = session->logCfg;
 
-    lttng_live = session->lttng_live_msg_iter->lttng_live_comp;
-
     lttng_live_stream_iterator::UP stream_iter =
         bt2_common::makeUnique<lttng_live_stream_iterator>(logCfg);
     trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
@@ -197,19 +213,6 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
     stream_iter->last_inactivity_ts.is_set = false;
     stream_iter->last_inactivity_ts.value = 0;
 
-    if (trace->trace) {
-        struct ctf_trace_class *ctf_tc =
-            ctf_metadata_decoder_borrow_ctf_trace_class(trace->metadata->decoder.get());
-        BT_ASSERT(!stream_iter->msg_iter);
-        stream_iter->msg_iter = ctf_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops,
-                                                    stream_iter.get(), self_msg_iter, logCfg);
-        if (!stream_iter->msg_iter) {
-            BT_CLOGE_APPEND_CAUSE("Failed to create CTF message iterator");
-            return nullptr;
-        }
-    }
-    stream_iter->buf.resize(lttng_live->max_query_size);
-
     nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
     stream_iter->name = nameSs.str();
 
@@ -222,6 +225,18 @@ lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t c
     return ret;
 }
 
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+                                                 uint64_t ctfStreamClsId)
+{
+    if (streamIter->ctf_stream_class_id.is_set) {
+        BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
+        return;
+    } else {
+        streamIter->ctf_stream_class_id.value = ctfStreamClsId;
+        streamIter->ctf_stream_class_id.is_set = true;
+    }
+}
+
 lttng_live_stream_iterator::~lttng_live_stream_iterator()
 {
     /* Track the number of active stream iterator. */
index 5078dfd0f01658f115d7f15c65f189be3862aa79..c551c14fd63059b6ba66f8ff8b80b8e82d73ce11 100644 (file)
@@ -12,7 +12,6 @@
 
 #include <glib.h>
 
-#include "../common/src/msg-iter/msg-iter.hpp"
 #include "lttng-live.hpp"
 
 enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
@@ -22,4 +21,30 @@ struct lttng_live_stream_iterator *
 lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
                                   uint64_t stream_id, bt_self_message_iterator *self_msg_iter);
 
+namespace ctf {
+namespace src {
+namespace live {
+
+struct CtfLiveMedium : Medium
+{
+    CtfLiveMedium(lttng_live_stream_iterator& liveStreamIter) : _mLiveStreamIter(liveStreamIter)
+    {
+    }
+
+    Buf buf(bt2_common::DataLen offset, bt2_common::DataLen minSize) override;
+
+private:
+    lttng_live_stream_iterator& _mLiveStreamIter;
+
+    bt2_common::DataLen _mCurPktBegOffsetInStream = bt2_common::DataLen::fromBits(0);
+    std::vector<uint8_t> _mBuf;
+};
+
+} /* namespace live */
+} /* namespace src */
+} /* namespace ctf */
+
+lttng_live_iterator_status
+lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter);
+
 #endif /* LTTNG_LIVE_DATA_STREAM_H */
index 2b20af805f2877bd3ccff661590010d48b9d53a9..774a115ac374010bc4444ded94aba13bb3d2c0a4 100644 (file)
@@ -23,6 +23,7 @@
 #include <babeltrace2/types.h>
 #include "cpp-common/exc.hpp"
 #include "cpp-common/glib-up.hpp"
+#include "cpp-common/lib-str.hpp"
 #include "cpp-common/make-unique.hpp"
 #include "cpp-common/vector.hpp"
 
@@ -31,6 +32,8 @@
 #include "cpp-common/cfg-logging-error-reporting.hpp"
 #include "cpp-common/cfg-logging-error-reporting-throw.hpp"
 
+#include "../common/src/pkt-props.hpp"
+
 #include "data-stream.hpp"
 #include "metadata.hpp"
 #include "lttng-live.hpp"
@@ -311,15 +314,17 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da
         goto end;
     }
 
-    lttng_live_stream->base_offset = index.offset;
-    lttng_live_stream->offset = index.offset;
-    lttng_live_stream->len = index.packet_size / CHAR_BIT;
+    lttng_live_stream->curPktInfo.emplace(lttng_live_stream_iterator::CurPktInfo {
+        bt2_common::DataLen::fromBytes(index.offset),
+        bt2_common::DataLen::fromBits(index.packet_size),
+    });
 
     BT_CLOGD("Setting live stream reading info: stream-name=\"%s\", "
-             "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 ", stream-offset=%" PRIu64
-             ", stream-len=%" PRIu64,
+             "viewer-stream-id=%" PRIu64 ", stream-base-offset=%llu"
+             ", stream-len-bytes=%llu",
              lttng_live_stream->name.c_str(), lttng_live_stream->viewer_stream_id,
-             lttng_live_stream->base_offset, lttng_live_stream->offset, lttng_live_stream->len);
+             lttng_live_stream->curPktInfo->offsetInRelay.bytes(),
+             lttng_live_stream->curPktInfo->len.bytes());
 
 end:
     if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
@@ -373,18 +378,18 @@ lttng_live_get_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
         break;
     case LTTNG_LIVE_ITERATOR_STATUS_END:
         /*
-                * We received a `_END` from the `_get_new_streams()` function,
-                * which means no more data will ever be received from the data
-                * streams of this session. But it's possible that the metadata
-                * is incomplete.
-                * The live protocol guarantees that we receive all the
-                * metadata needed before we receive data streams needing it.
-                * But it's possible to receive metadata NOT needed by
-                * data streams after the session was closed. For example, this
-                * could happen if a new event is registered and the session is
-                * stopped before any tracepoint for that event is actually
-                * fired.
-                */
+         * We received a `_END` from the `_get_new_streams()` function,
+         * which means no more data will ever be received from the data
+         * streams of this session. But it's possible that the metadata
+         * is incomplete.
+         * The live protocol guarantees that we receive all the
+         * metadata needed before we receive data streams needing it.
+         * But it's possible to receive metadata NOT needed by
+         * data streams after the session was closed. For example, this
+         * could happen if a new event is registered and the session is
+         * stopped before any tracepoint for that event is actually
+         * fired.
+         */
         BT_CLOGD(
             "Updating streams returned _END status. Override status to _OK in order fetch any remaining metadata:"
             "session-id=%" PRIu64 ", session-name=\"%s\"",
@@ -529,8 +534,9 @@ emit_inactivity_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
              ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64,
              stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp);
 
-    msg = bt_message_message_iterator_inactivity_create(lttng_live_msg_iter->self_msg_iter,
-                                                        stream_iter->trace->clock_class, timestamp);
+    msg = bt_message_message_iterator_inactivity_create(
+        lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class->libObjPtr(),
+        timestamp);
     if (!msg) {
         BT_CLOGE_APPEND_CAUSE("Error emitting message iterator inactivity message");
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
@@ -576,62 +582,53 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies
 
 static int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
                               struct lttng_live_msg_iter *lttng_live_msg_iter,
-                              const bt_message *msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
+                              bt2::ConstMessage msg, int64_t last_msg_ts_ns, int64_t *ts_ns)
 {
-    const bt_clock_snapshot *clock_snapshot = NULL;
-    int ret = 0;
+    nonstd::optional<bt2::ConstClockSnapshot> clockSnapshot;
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
 
-    BT_ASSERT_DBG(msg);
     BT_ASSERT_DBG(ts_ns);
 
     BT_CLOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
              "last-msg-ts=%" PRId64,
-             lttng_live_msg_iter, msg, last_msg_ts_ns);
+             lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns);
 
-    switch (bt_message_get_type(msg)) {
-    case BT_MESSAGE_TYPE_EVENT:
-        clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(msg);
+    switch (msg.type()) {
+    case bt2::MessageType::EVENT:
+        clockSnapshot = msg.asEvent().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_PACKET_BEGINNING:
-        clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(msg);
+    case bt2::MessageType::PACKET_BEGINNING:
+
+        clockSnapshot = msg.asPacketBeginning().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_PACKET_END:
-        clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(msg);
+    case bt2::MessageType::PACKET_END:
+        clockSnapshot = msg.asPacketEnd().defaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
-        clock_snapshot =
-            bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(msg);
+    case bt2::MessageType::DISCARDED_EVENTS:
+        clockSnapshot = msg.asDiscardedEvents().beginningDefaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
-        clock_snapshot =
-            bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(msg);
+    case bt2::MessageType::DISCARDED_PACKETS:
+        clockSnapshot = msg.asDiscardedPackets().beginningDefaultClockSnapshot();
         break;
-    case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
-        clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(msg);
+    case bt2::MessageType::MESSAGE_ITERATOR_INACTIVITY:
+        clockSnapshot = msg.asMessageIteratorInactivity().clockSnapshot();
         break;
     default:
         /* All the other messages have a higher priority */
         BT_CLOGD(
             "Message has no timestamp, using the last message timestamp: iter-data-addr=%p, msg-addr=%p, "
             "last-msg-ts=%" PRId64 ", ts=%" PRId64,
-            lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+            lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns, *ts_ns);
         *ts_ns = last_msg_ts_ns;
         return 0;
     }
 
-    ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
-    if (ret) {
-        BT_CLOGE_APPEND_CAUSE("Cannot get nanoseconds from Epoch of clock snapshot: "
-                              "clock-snapshot-addr=%p",
-                              clock_snapshot);
-        return -1;
-    }
+    *ts_ns = clockSnapshot->nsFromOrigin();
 
     BT_CLOGD("Found message's timestamp: "
              "iter-data-addr=%p, msg-addr=%p, "
              "last-msg-ts=%" PRId64 ", ts=%" PRId64,
-             lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
+             lttng_live_msg_iter, msg.libObjPtr(), last_msg_ts_ns, *ts_ns);
 
     return 0;
 }
@@ -642,7 +639,6 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
     nonstd::optional<bt2::ConstMessage::Shared>& message)
 {
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
-    enum ctf_msg_iter_status status;
 
     for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
         if (session->new_streams_needed) {
@@ -668,27 +664,28 @@ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    const bt_message *msg;
-    status = ctf_msg_iter_get_next_message(lttng_live_stream->msg_iter.get(), &msg);
-    switch (status) {
-    case CTF_MSG_ITER_STATUS_EOF:
-        return LTTNG_LIVE_ITERATOR_STATUS_END;
-    case CTF_MSG_ITER_STATUS_OK:
-        message = bt2::ConstMessage::Shared::createWithoutRef(msg);
-        return LTTNG_LIVE_ITERATOR_STATUS_OK;
-    case CTF_MSG_ITER_STATUS_AGAIN:
-        /*
-         * Continue immediately (end of packet). The next
-         * get_index may return AGAIN to delay the following
-         * attempt.
-         */
-        return LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
-    case CTF_MSG_ITER_STATUS_ERROR:
-    default:
+    if (!lttng_live_stream->msg_iter) {
+        /* The first time we're called for this stream, the MsgIter is not instantiated.  */
+        enum lttng_live_iterator_status ret =
+            lttng_live_stream_iterator_create_msg_iter(lttng_live_stream);
+        if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
+            return ret;
+        }
+    }
+
+    try {
+        message = lttng_live_stream->msg_iter->next();
+        if (message) {
+            return LTTNG_LIVE_ITERATOR_STATUS_OK;
+        } else {
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
+    } catch (const bt2_common::TryAgain&) {
+        return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+    } catch (const bt2::Error&) {
         BT_CLOGE_APPEND_CAUSE("CTF message iterator failed to get next message: "
-                              "msg-iter=%p, msg-iter-status=%s",
-                              lttng_live_stream->msg_iter.get(),
-                              ctf_msg_iter_status_string(status));
+                              "msg-iter=%p",
+                              &*lttng_live_stream->msg_iter);
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 }
@@ -709,23 +706,23 @@ lttng_live_iterator_close_stream(struct lttng_live_msg_iter *lttng_live_msg_iter
      * `ctf_msg_iter` should simply realize that it needs to close the
      * stream properly by emitting the necessary stream end message.
      */
-    const bt_message *msg;
-    enum ctf_msg_iter_status status =
-        ctf_msg_iter_get_next_message(stream_iter->msg_iter.get(), &msg);
+    try {
+        if (!stream_iter->msg_iter) {
+            BT_CLOGI("Reached the end of the live stream iterator.");
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
+
+        curr_msg = stream_iter->msg_iter->next();
+        if (!curr_msg) {
+            BT_CLOGI("Reached the end of the live stream iterator.");
+            return LTTNG_LIVE_ITERATOR_STATUS_END;
+        }
 
-    if (status == CTF_MSG_ITER_STATUS_ERROR) {
+        return LTTNG_LIVE_ITERATOR_STATUS_OK;
+    } catch (const bt2::Error&) {
         BT_CLOGE_APPEND_CAUSE("Error getting the next message from CTF message iterator");
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    } else if (status == CTF_MSG_ITER_STATUS_EOF) {
-        BT_CLOGI("Reached the end of the live stream iterator.");
-        return LTTNG_LIVE_ITERATOR_STATUS_END;
     }
-
-    BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
-
-    curr_msg = bt2::ConstMessage::Shared::createWithoutRef(msg);
-
-    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 /*
@@ -863,58 +860,43 @@ static bool is_discarded_packet_or_event_message(bt2::ConstMessage msg)
 }
 
 static enum lttng_live_iterator_status adjust_discarded_packets_message(
-    bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
-    nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
+    bt_self_message_iterator *iter, bt2::Stream stream, bt2::ConstDiscardedPacketsMessage msgIn,
+    nonstd::optional<bt2::Message::Shared>& msgOut, uint64_t new_begin_ts)
 {
-    enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
-    enum bt_property_availability availability;
-    const bt_clock_snapshot *clock_snapshot;
-    uint64_t end_ts;
-    uint64_t count;
-
-    clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in);
-    end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
-    availability = bt_message_discarded_packets_get_count(msg_in, &count);
-    BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+    uint64_t end_ts = msgIn.endDefaultClockSnapshot().value();
+    nonstd::optional<uint64_t> count = msgIn.count();
+    BT_ASSERT_DBG(count);
 
     bt_message *msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
-        iter, stream, new_begin_ts, end_ts);
+        iter, stream.libObjPtr(), new_begin_ts, end_ts);
     if (!msg) {
-        status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
-        goto end;
+        return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_packets_set_count(msg, count);
-    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    bt_message_discarded_packets_set_count(msg, *count);
+    msgOut = bt2::Message::Shared::createWithoutRef(msg);
 
-end:
-    return status;
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
-static enum lttng_live_iterator_status adjust_discarded_events_message(
-    bt_self_message_iterator *iter, const bt_stream *stream, const bt_message *msg_in,
-    nonstd::optional<bt2::ConstMessage::Shared>& msg_out, uint64_t new_begin_ts)
+static enum lttng_live_iterator_status
+adjust_discarded_events_message(bt_self_message_iterator *iter, const bt2::Stream stream,
+                                bt2::ConstDiscardedEventsMessage msgIn,
+                                nonstd::optional<bt2::Message::Shared>& msgOut,
+                                uint64_t new_begin_ts)
 {
-    enum bt_property_availability availability;
-    const bt_clock_snapshot *clock_snapshot;
-    uint64_t end_ts;
-    uint64_t count;
-
-    clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in);
-    end_ts = bt_clock_snapshot_get_value(clock_snapshot);
-
-    availability = bt_message_discarded_events_get_count(msg_in, &count);
-    BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE);
+    uint64_t end_ts = msgIn.endDefaultClockSnapshot().value();
+    nonstd::optional<uint64_t> count = msgIn.count();
+    BT_ASSERT_DBG(count);
 
     bt_message *msg = bt_message_discarded_events_create_with_default_clock_snapshots(
-        iter, stream, new_begin_ts, end_ts);
+        iter, stream.libObjPtr(), new_begin_ts, end_ts);
     if (!msg) {
         return LTTNG_LIVE_ITERATOR_STATUS_NOMEM;
     }
 
-    bt_message_discarded_events_set_count(msg, count);
-    msg_out = bt2::ConstMessage::Shared::createWithoutRef(msg);
+    bt_message_discarded_events_set_count(msg, *count);
+    msgOut = bt2::Message::Shared::createWithoutRef(msg);
 
     return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
@@ -925,12 +907,6 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
                     bt2::ConstMessage::Shared late_msg)
 {
     const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
-    const bt_clock_class *clock_class;
-    const bt_stream_class *stream_class;
-    enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status;
-    int64_t last_inactivity_ts_ns;
-    enum lttng_live_iterator_status adjust_status;
-    nonstd::optional<bt2::ConstMessage::Shared> adjusted_message;
 
     /*
      * The timestamp of the current message is before the last message sent
@@ -973,25 +949,18 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
     }
 
     if (!is_discarded_packet_or_event_message(*late_msg)) {
-        BT_CLOGE_APPEND_CAUSE(
-            "Invalid live stream state: "
-            "have a late message that is not a packet discarded or "
-            "event discarded message: late-msg-type=%s",
-            bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())));
+        BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
+                              "have a late message that is not a packet discarded or "
+                              "event discarded message: late-msg-type=%s",
+                              bt2_common::messageTypeStr(late_msg->type()));
         return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
     }
 
-    stream_class = bt_stream_borrow_class_const((*stream_iter->stream)->libObjPtr());
-    clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class);
-
-    ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(
-        clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns);
-    if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) {
-        BT_CLOGE_APPEND_CAUSE("Error converting last "
-                              "inactivity message timestamp to nanoseconds");
-        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    }
+    bt2::StreamClass streamClass = (*stream_iter->stream)->cls();
+    nonstd::optional<bt2::ClockClass> clockClass = streamClass.defaultClockClass();
 
+    int64_t last_inactivity_ts_ns =
+        clockClass->cyclesToNsFromOrigin(stream_iter->last_inactivity_ts.value);
     if (last_inactivity_ts_ns <= late_msg_ts_ns) {
         BT_CLOGE_APPEND_CAUSE("Invalid live stream state: "
                               "have a late message that is none included in a stream "
@@ -1007,18 +976,19 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
      */
     BT_CLOGD("Adjusting the timestamp of late message: late-msg-type=%s, "
              "msg-new-ts-ns=%" PRIu64,
-             bt_common_message_type_string(bt_message_get_type(late_msg->libObjPtr())),
-             stream_iter->last_inactivity_ts.value);
+             bt2_common::messageTypeStr(late_msg->type()), stream_iter->last_inactivity_ts.value);
+    nonstd::optional<bt2::Message::Shared> adjustedMessage;
+    lttng_live_iterator_status adjust_status;
     switch (late_msg->type()) {
     case bt2::MessageType::DISCARDED_EVENTS:
         adjust_status = adjust_discarded_events_message(
-            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
-            late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, **stream_iter->stream,
+            late_msg->asDiscardedEvents(), adjustedMessage, stream_iter->last_inactivity_ts.value);
         break;
     case bt2::MessageType::DISCARDED_PACKETS:
         adjust_status = adjust_discarded_packets_message(
-            lttng_live_msg_iter->self_msg_iter, (*stream_iter->stream)->libObjPtr(),
-            late_msg->libObjPtr(), adjusted_message, stream_iter->last_inactivity_ts.value);
+            lttng_live_msg_iter->self_msg_iter, **stream_iter->stream,
+            late_msg->asDiscardedPackets(), adjustedMessage, stream_iter->last_inactivity_ts.value);
         break;
     default:
         bt_common_abort();
@@ -1028,8 +998,8 @@ handle_late_message(struct lttng_live_msg_iter *lttng_live_msg_iter,
         return adjust_status;
     }
 
-    BT_ASSERT_DBG(adjusted_message);
-    stream_iter->current_msg = adjusted_message;
+    BT_ASSERT_DBG(adjustedMessage);
+    stream_iter->current_msg = std::move(adjustedMessage);
     stream_iter->current_msg_ts_ns = last_inactivity_ts_ns;
 
     return LTTNG_LIVE_ITERATOR_STATUS_OK;
@@ -1086,14 +1056,14 @@ next_stream_iterator_for_trace(struct lttng_live_msg_iter *lttng_live_msg_iter,
 
             BT_CLOGD("Live stream iterator returned message: msg-type=%s, "
                      "stream-name=\"%s\", viewer-stream-id=%" PRIu64,
-                     bt_common_message_type_string(bt_message_get_type((*msg)->libObjPtr())),
-                     stream_iter->name.c_str(), stream_iter->viewer_stream_id);
+                     bt2_common::messageTypeStr((*msg)->type()), stream_iter->name.c_str(),
+                     stream_iter->viewer_stream_id);
 
             /*
              * Get the timestamp in nanoseconds from origin of this
              * messsage.
              */
-            live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, (*msg)->libObjPtr(),
+            live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter, **msg,
                                lttng_live_msg_iter->last_msg_ts_ns, &curr_msg_ts_ns);
 
             /*
index e550813e71c313fb205ebc57209cd419d96b6662..249bbb77d6bf53e37828e12f685de2010e840891 100644 (file)
@@ -19,9 +19,9 @@
 #include <babeltrace2/babeltrace.h>
 
 #include "common/macros.h"
-#include "cpp-common/bt2/message.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/metadata/tsdl/ctf-1-metadata-stream-parser.hpp"
+#include "../common/src/msg-iter.hpp"
+#include "../common/src/metadata/metadata-stream-parser-utils.hpp"
 #include "viewer-connection.hpp"
 
 struct lttng_live_component;
@@ -92,7 +92,7 @@ struct lttng_live_stream_iterator
      * Since only a single iterator per viewer connection, we have
      * only a single message iterator per stream.
      */
-    ctf_msg_iter_up msg_iter;
+    nonstd::optional<ctf::src::MsgIter> msg_iter;
 
     uint64_t viewer_stream_id = 0;
 
@@ -102,13 +102,6 @@ struct lttng_live_stream_iterator
         uint64_t value = 0;
     } ctf_stream_class_id;
 
-    /* base offset in current index. */
-    uint64_t base_offset = 0;
-    /* len to read in current index. */
-    uint64_t len = 0;
-    /* offset in current index. */
-    uint64_t offset = 0;
-
     /*
      * Clock Snapshot value of the last message iterator inactivity message
      * sent downstream.
@@ -136,28 +129,58 @@ struct lttng_live_stream_iterator
     /* Timestamp in nanoseconds of the current message (current_msg). */
     int64_t current_msg_ts_ns = 0;
 
-    std::vector<uint8_t> buf;
-
     std::string name;
 
     bool has_stream_hung_up = false;
+
+    struct CurPktInfo
+    {
+        bt2_common::DataLen offsetInRelay;
+        bt2_common::DataLen len;
+    };
+
+    nonstd::optional<CurPktInfo> curPktInfo;
 };
 
 struct lttng_live_metadata
 {
     using UP = std::unique_ptr<lttng_live_metadata>;
 
-    explicit lttng_live_metadata(const bt2_common::LogCfg& logCfgParam) noexcept :
-        logCfg {logCfgParam}
+    explicit lttng_live_metadata(bt_self_component *selfComp,
+                                 const bt2_common::LogCfg& logCfgParam) noexcept :
+        logCfg {logCfgParam},
+        _mSelfComp {selfComp}
+
+    {
+    }
+
+    const ctf::src::TraceCls *traceCls() const
     {
+        return _mMetadataStreamParser->traceCls();
+    }
+
+    const nonstd::optional<bt2_common::Uuid>& metadataStreamUuid() const noexcept
+    {
+        return _mMetadataStreamParser->metadataStreamUuid();
+    }
+
+    void parseSection(const uint8_t *begin, const uint8_t *end)
+    {
+        if (!_mMetadataStreamParser) {
+            _mMetadataStreamParser =
+                ctf::src::createMetadataStreamParser(begin, {}, _mSelfComp, logCfg);
+        }
+
+        _mMetadataStreamParser->parseSection(begin, end);
     }
 
     const bt2_common::LogCfg logCfg;
 
     uint64_t stream_id = 0;
 
-    /* Weak reference. */
-    ctf_metadata_decoder_up decoder;
+private:
+    bt_self_component *_mSelfComp;
+    ctf::src::MetadataStreamParser::UP _mMetadataStreamParser;
 };
 
 enum lttng_live_metadata_stream_state
@@ -201,11 +224,9 @@ struct lttng_live_trace
     /* Owned by this. */
     nonstd::optional<bt2::Trace::Shared> trace;
 
-    nonstd::optional<bt2::TraceClass::Shared> trace_class;
-
     lttng_live_metadata::UP metadata;
 
-    const bt_clock_class *clock_class = nullptr;
+    nonstd::optional<bt2::ConstClockClass> clock_class;
 
     std::vector<lttng_live_stream_iterator::UP> stream_iterators;
 
@@ -386,21 +407,20 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter, uint
  * written to the file.
  */
 enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf);
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf);
 
 enum lttng_live_iterator_status
 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
                           struct lttng_live_stream_iterator *stream, struct packet_index *index);
 
-enum ctf_msg_iter_medium_status
-lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
-                            struct lttng_live_stream_iterator *stream, uint8_t *buf,
-                            uint64_t offset, uint64_t req_len, uint64_t *recv_len);
-
 bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
 
 BT_HIDDEN
 void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter,
                                           enum lttng_live_stream_state new_state);
 
+BT_HIDDEN
+void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
+                                                 uint64_t ctfStreamClsId);
+
 #endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */
index 865268ccabe1311dbe4457e8edd18deea29f1ef8..75da520fc08d442001b7bd8dcfb481c65b76e751 100644 (file)
@@ -20,7 +20,6 @@
 #include "cpp-common/libc-up.hpp"
 
 #include "metadata.hpp"
-#include "../common/src/metadata/tsdl/decoder.hpp"
 #include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp"
 #include "cpp-common/cfg-logging.hpp"
 #include "cpp-common/cfg-logging-error-reporting.hpp"
@@ -41,24 +40,17 @@ struct packet_header
     uint8_t minor;
 } __attribute__((__packed__));
 
-static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
+static bool stream_classes_all_have_default_clock_class(bt2::ConstTraceClass tc,
                                                         const bt2_common::LogCfg& logCfg)
 {
-    uint64_t i, sc_count;
-    const bt_clock_class *cc = NULL;
-    const bt_stream_class *sc;
+    for (std::uint64_t i = 0; i < tc.size(); ++i) {
+        bt2::ConstStreamClass sc = tc[i];
+        nonstd::optional<bt2::ConstClockClass> cc = sc.defaultClockClass();
 
-    sc_count = bt_trace_class_get_stream_class_count(tc);
-    for (i = 0; i < sc_count; i++) {
-        sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-
-        BT_ASSERT(sc);
-
-        cc = bt_stream_class_borrow_default_clock_class_const(sc);
         if (!cc) {
             BT_CLOGE_APPEND_CAUSE("Stream class doesn't have a default clock class: "
                                   "sc-id=%" PRIu64 ", sc-name=\"%s\"",
-                                  bt_stream_class_get_id(sc), bt_stream_class_get_name(sc));
+                                  sc.id(), sc.name()->c_str());
             return false;
         }
     }
@@ -70,24 +62,9 @@ static bool stream_classes_all_have_default_clock_class(bt_trace_class *tc,
  * encountered. This is useful to create message iterator inactivity message as
  * we don't need a particular clock class.
  */
-static const bt_clock_class *borrow_any_clock_class(bt_trace_class *tc)
+static bt2::ConstClockClass borrow_any_clock_class(bt2::ConstTraceClass tc)
 {
-    uint64_t i, sc_count;
-    const bt_clock_class *cc = NULL;
-    const bt_stream_class *sc;
-
-    sc_count = bt_trace_class_get_stream_class_count(tc);
-    for (i = 0; i < sc_count; i++) {
-        sc = bt_trace_class_borrow_stream_class_by_index_const(tc, i);
-        BT_ASSERT_DBG(sc);
-
-        cc = bt_stream_class_borrow_default_clock_class_const(sc);
-        if (cc) {
-            return cc;
-        }
-    }
-
-    bt_common_abort();
+    return *tc[0].defaultClockClass();
 }
 
 BT_HIDDEN
@@ -95,10 +72,7 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra
 {
     struct lttng_live_session *session = trace->session;
     struct lttng_live_metadata *metadata = trace->metadata.get();
-    std::vector<char> metadataBuf;
     bool keep_receiving;
-    bt2_common::FileUP fp;
-    enum ctf_metadata_decoder_status decoder_status;
     const bt2_common::LogCfg& logCfg = trace->logCfg;
     enum lttng_live_get_one_metadata_status metadata_status;
 
@@ -128,6 +102,7 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra
 
     keep_receiving = true;
     /* Grab all available metadata. */
+    std::vector<uint8_t> metadataBuf;
     while (keep_receiving) {
         /*
          * lttng_live_get_one_metadata_packet() asks the Relay Daemon
@@ -182,57 +157,35 @@ enum lttng_live_iterator_status lttng_live_metadata_update(struct lttng_live_tra
         return LTTNG_LIVE_ITERATOR_STATUS_OK;
     }
 
-    /*
-     * Open a new reading file handle on the `metadata_buf` and pass it to
-     * the metadata decoder.
-     */
-    fp.reset(bt_fmemopen(metadataBuf.data(), metadataBuf.size(), "rb"));
-    if (!fp) {
-        if (errno == EINTR && lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
-            session->lttng_live_msg_iter->was_interrupted = true;
-            return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-        } else {
-            BT_CLOGE_ERRNO_APPEND_CAUSE("Cannot memory-open metadata buffer", ".");
-            return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-        }
-    }
-
     /*
      * The call to ctf_metadata_decoder_append_content() will append
      * new metadata to our current trace class.
      */
     BT_CLOGD("Appending new metadata to the ctf_trace class");
-    decoder_status = ctf_metadata_decoder_append_content(metadata->decoder.get(), fp.get());
-    switch (decoder_status) {
-    case CTF_METADATA_DECODER_STATUS_OK:
-        if (!trace->trace_class) {
-            struct ctf_trace_class *tc =
-                ctf_metadata_decoder_borrow_ctf_trace_class(metadata->decoder.get());
+    metadata->parseSection(metadataBuf.data(), metadataBuf.data() + metadataBuf.size());
+    if (!trace->trace) {
+        const ctf::src::TraceCls *ctfTraceCls = metadata->traceCls();
+        BT_ASSERT(ctfTraceCls);
+        nonstd::optional<bt2::TraceClass> irTraceCls = ctfTraceCls->libCls();
 
-            trace->trace_class = ctf_metadata_decoder_get_ir_trace_class(metadata->decoder.get());
-            trace->trace = (*trace->trace_class)->instantiate();
-            if (!trace->trace) {
-                BT_CLOGE_APPEND_CAUSE("Failed to create bt_trace");
-                return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-            }
+        if (irTraceCls) {
+            trace->trace = irTraceCls->instantiate();
 
-            ctf_trace_class_configure_ir_trace(tc, **trace->trace);
+            ctf_trace_class_configure_ir_trace(*ctfTraceCls, **trace->trace, logCfg);
 
-            if (!stream_classes_all_have_default_clock_class((*trace->trace_class)->libObjPtr(),
-                                                             logCfg)) {
+            if (!stream_classes_all_have_default_clock_class((*trace->trace)->cls(), logCfg)) {
                 /* Error logged in function. */
                 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
             }
-            trace->clock_class = borrow_any_clock_class((*trace->trace_class)->libObjPtr());
+
+            trace->clock_class = borrow_any_clock_class((*trace->trace)->cls());
         }
+    }
 
-        /* The metadata was updated succesfully. */
-        trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
+    /* The metadata was updated succesfully. */
+    trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NOT_NEEDED;
 
-        return LTTNG_LIVE_ITERATOR_STATUS_OK;
-    default:
-        return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-    }
+    return LTTNG_LIVE_ITERATOR_STATUS_OK;
 }
 
 BT_HIDDEN
@@ -241,19 +194,11 @@ int lttng_live_metadata_create_stream(struct lttng_live_session *session, uint64
 {
     const bt2_common::LogCfg& logCfg = session->logCfg;
     struct lttng_live_trace *trace;
+    lttng_live_metadata::UP metadata =
+        bt2_common::makeUnique<lttng_live_metadata>(session->self_comp, logCfg);
 
-    ctf_metadata_decoder_config cfg(logCfg);
-    cfg.self_comp = session->self_comp;
-    cfg.create_trace_class = true;
-
-    lttng_live_metadata::UP metadata = bt2_common::makeUnique<lttng_live_metadata>(logCfg);
     metadata->stream_id = stream_id;
 
-    metadata->decoder = ctf_metadata_decoder_create(&cfg);
-    if (!metadata->decoder) {
-        BT_CLOGE_APPEND_CAUSE("Failed to create CTF metadata decoder");
-        return -1;
-    }
     trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
     if (!trace) {
         BT_CLOGE_APPEND_CAUSE("Failed to borrow trace");
index e504304ca604c1f4bc1d9fcff58dc6cf38d7a2aa..053910cf6dec92bdf09980c403b3f4c3e3779855 100644 (file)
@@ -22,6 +22,7 @@
 #include "compat/endian.h"
 #include "compat/compiler.h"
 #include "common/common.h"
+#include "cpp-common/exc.hpp"
 #include <babeltrace2/babeltrace.h>
 #include "cpp-common/make-unique.hpp"
 
@@ -158,16 +159,16 @@ viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_statu
     bt_common_abort();
 }
 
-static inline enum ctf_msg_iter_medium_status
-viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
+static inline enum lttng_live_get_stream_bytes_status
+viewer_status_to_lttng_live_get_stream_bytes_status(enum lttng_live_viewer_status viewer_status)
 {
     switch (viewer_status) {
     case LTTNG_LIVE_VIEWER_STATUS_OK:
-        return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
     case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
     case LTTNG_LIVE_VIEWER_STATUS_ERROR:
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     bt_common_abort();
@@ -1031,14 +1032,13 @@ enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_sessio
 
 BT_HIDDEN
 enum lttng_live_get_one_metadata_status
-lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
+lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<uint8_t>& buf)
 {
     uint64_t len = 0;
     enum lttng_live_viewer_status viewer_status;
     struct lttng_viewer_cmd cmd;
     struct lttng_viewer_get_metadata rq;
     struct lttng_viewer_metadata_packet rp;
-    std::vector<char> data;
     struct lttng_live_session *session = trace->session;
     struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
     struct lttng_live_metadata *metadata = trace->metadata.get();
@@ -1113,18 +1113,15 @@ lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<c
         return LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
     }
 
-    data.resize(len);
+    std::vector<char> localBuf(len);
 
-    viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
+    viewer_status = lttng_live_recv(viewer_connection, localBuf.data(), len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(logCfg, viewer_status, "get metadata packet");
         return (lttng_live_get_one_metadata_status) viewer_status;
     }
 
-    /*
-     * Write the metadata to the file handle.
-     */
-    buf.insert(buf.end(), data.begin(), data.end());
+    buf.insert(buf.end(), localBuf.begin(), localBuf.end());
 
     return LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
 }
@@ -1280,7 +1277,7 @@ lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
 }
 
 BT_HIDDEN
-enum ctf_msg_iter_medium_status
+lttng_live_get_stream_bytes_status
 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
                             struct lttng_live_stream_iterator *stream, uint8_t *buf,
                             uint64_t offset, uint64_t req_len, uint64_t *recv_len)
@@ -1320,13 +1317,13 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
     viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_send_status(logCfg, viewer_status, "get data packet command");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
 
     viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(logCfg, viewer_status, "get data packet reply");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
 
     flags = be32toh(rp.flags);
@@ -1343,7 +1340,7 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
         break;
     case LTTNG_VIEWER_GET_PACKET_RETRY:
         /* Unimplemented by relay daemon */
-        return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
     case LTTNG_VIEWER_GET_PACKET_ERR:
         if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
             BT_CLOGD("Marking trace as needing new metadata: "
@@ -1360,29 +1357,29 @@ lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
         if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
             BT_CLOGD("Reply with any one flags set means we should retry: response=%s",
                      lttng_viewer_get_packet_return_code_string(rp_status));
-            return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+            return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN;
         }
         BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: error");
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     case LTTNG_VIEWER_GET_PACKET_EOF:
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF;
     default:
         BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: unknown (%d)", rp_status);
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     if (req_len == 0) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+        return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR;
     }
 
     viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
     if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
         viewer_handle_recv_status(logCfg, viewer_status, "get data packet");
-        return viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+        return viewer_status_to_lttng_live_get_stream_bytes_status(viewer_status);
     }
     *recv_len = req_len;
 
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+    return LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK;
 }
 
 /*
index 3abba176a8ad61daa47df943f2b0f9e73c8ab7c2..a23c8fb90d934d6f55e109b0ddeae89099f25b5c 100644 (file)
@@ -10,6 +10,7 @@
 #include <stdbool.h>
 #include <stdint.h>
 #include <stdio.h>
+#include <memory>
 #include <string>
 
 #include <glib.h>
@@ -117,4 +118,18 @@ lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter
 bt2::Value::Shared
 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection);
 
+enum lttng_live_get_stream_bytes_status
+{
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK = __BT_FUNC_STATUS_OK,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
+    LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF = __BT_FUNC_STATUS_END,
+};
+
+BT_HIDDEN
+lttng_live_get_stream_bytes_status
+lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
+                            struct lttng_live_stream_iterator *stream, uint8_t *buf,
+                            uint64_t offset, uint64_t req_len, uint64_t *recv_len);
+
 #endif /* LTTNG_LIVE_VIEWER_CONNECTION_H */
index 4a06297694a7dcc49ac54eb408a30958b0a4f45d..9df75b633b33fc2224a76deab16643410cb04ce8 100644 (file)
@@ -19,6 +19,9 @@ Trace class:
     Packet context field class: Structure (1 member):
       cpu_id: Unsigned integer (32-bit, Base 10)
     Event class `sample_component:message` (ID 0):
+      User attributes:
+        babeltrace.org,2020:
+          log-level: warning
       Log level: Warning
       Payload field class: Structure (1 member):
         message: String
index 9c7db7cf55128335e99d264e4bf8d0736006c07c..b2711cb98b4271fe4b661dc6e758d217e032cfc4 100644 (file)
@@ -57,6 +57,9 @@ Trace class:
     Packet context field class: Structure (1 member):
       cpu_id: Unsigned integer (32-bit, Base 10)
     Event class `sample_component:message` (ID 0):
+      User attributes:
+        babeltrace.org,2020:
+          log-level: warning
       Log level: Warning
       Payload field class: Structure (1 member):
         message: String
index 17b6ccb4e9dcf4039dbaf38d94cdbfab51fccf9f..40a46ee4419d068760a41e9510b22690f7078fcc 100644 (file)
@@ -19,9 +19,15 @@ Trace class:
     Packet context field class: Structure (1 member):
       cpu_id: Unsigned integer (32-bit, Base 10)
     Event class `my_app:signe_de_pia$$e` (ID 0):
+      User attributes:
+        babeltrace.org,2020:
+          log-level: debug:line
       Log level: Debug (line)
       Payload field class: Structure (0 members)
     Event class `my_app:signe_de_pia$$e_2` (ID 1):
+      User attributes:
+        babeltrace.org,2020:
+          log-level: debug:line
       Log level: Debug (line)
       Payload field class: Structure (0 members)
 
index e3c2a35a23321522e05582cd1d6c133d20396bf4..7bc811eff81e5bebb9b3fa3f95700d6e17c264ba 100755 (executable)
@@ -320,13 +320,6 @@ test_compare_to_ctf_fs() {
        bt_cli "$expected_stdout" "$expected_stderr" "${trace_dir}/1/succeed/multi-domains" -c sink.text.details --params "with-trace-name=false,with-stream-name=false"
        bt_remove_cr "${expected_stdout}"
        bt_remove_cr "${expected_stderr}"
-
-       # Hack. To be removed when src.ctf.lttng-live is updated to use the new
-       # IR generator.
-       "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}"
-       "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}"
-       "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}"
-
        run_test "$test_text" "$cli_args_template" "$server_args" "$expected_stdout" "$expected_stderr"
        diag "Inverse session order from lttng-relayd"
        run_test "$test_text" "$cli_args_template" "$server_args_inverse" "$expected_stdout" "$expected_stderr"
This page took 0.0467 seconds and 5 git commands to generate.