src.ctf.fs: use the new metadata stream parser and message iterator
authorSimon Marchi <simon.marchi@efficios.com>
Thu, 30 May 2024 18:26:09 +0000 (14:26 -0400)
committerSimon Marchi <simon.marchi@efficios.com>
Wed, 4 Sep 2024 19:05:14 +0000 (15:05 -0400)
Update the src.ctf.fs component class to use:

  - the new metadata stream parser / IR generator (ctf::src::CtfIrGenerator)
  - the new message iterator (ctf::src::MsgIter)

The new IR generator produces a trace class instance using the new CTF
IR type, ctf::src::TraceCls.  This instance is then fed to the message
iterator.  These two changes must therefore be done in lock step.

Add ctf::src::fs::Medium, a Medium for use with ctf::src::ItemSeqIter
that reads trace data from the filesystem.  It receives an index, which
can be thought of as the "playlist" of packets that form the logical
data stream.  Its buf method needs to be able to find a packet given a
requested offset in that logical data stream.  Therefore, add a new
`offsetInStream` field in the ctf_fs_ds_index_entry structure, and the
`updateOffsetsInStream` method to compute them.  Rename
ctf_fs_ds_index_entry::offset to offsetInFile, for clarity.

Once the buf method selected the right index entry (packet), it maps
that portion of the file, and see if the returned buffer can include
some of the following packets in the file, while at it.

Other details about the changes in this patch, in no particular order:

 - Make a copy of ctf_trace_class_configure_ir_trace that works on a
   ctf::src::TraceCls instead of a struct ctf_trace_class.  The old one
   will be removed once src.ctf.lttng-live no longer uses it.

 - Remove ctf_fs_metadata, the required fields (the decoder / IR
   generator) are now directly in ctf_fs_trace.

 - The quirks to apply are saved in the ctf_fs_component object
   directly.  They were previously saved in the old IR object (struct
   ctf_trace_class), but they are not in the new IR object.  They now
   need to be passed to all instantiated message iterators.

 - Remove ctf_fs_msg_iter_data::ds_file_group, add
   ctf_fs_msg_iter_data::port_data.  This allows message iterators to
   access the ctf_fs_component and therefore the quirks.  They can still
   access the ds_file_group through the port_data.

 - Replace ctf_fs_msg_iter_data::msg_iter (the old message iterator)
   with ctf_fs_msg_iter_data::msgIter (the new message iterator).

 - Remove ctf_fs_msg_iter_data::msg_iter_medops_data, this is the data
   for the medium (medops) for the old message iterator.

 - Adjust ctf_fs_iterator_init, ctf_fs_iterator_next,
   ctf_fs_iterator_finalize to the new message iterator.

 - Adjust ctf_fs_iterator_seek_beginning to the new message iterator.
   The simplest way I found to reset the state to the beginning state is
   to delete the message iterator and instantiate a new one.

 - Update add_ds_file_to_ds_file_group to use readPktProps to read the
   stream class and id.

 - Change decode_clock_snapshot_after_event, used for working around
   known tracer bugs, to use an ItemSeqIter.  Define two item visitors,
   ClockSnapshotAfterFirstEventItemVisitor and
   ClockSnapshotAfterLastEventItemVisitor, for the two cases.

Change-Id: I8e3ce344c940da2106bdf8320c28724ca360b48a
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/8349
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/12744

17 files changed:
src/Makefile.am
src/plugins/ctf/common/src/metadata/metadata-stream-parser-utils.cpp
src/plugins/ctf/common/src/metadata/tsdl/ast.hpp
src/plugins/ctf/common/src/metadata/tsdl/ctf-1-metadata-stream-parser.cpp
src/plugins/ctf/common/src/metadata/tsdl/ctf-meta-configure-ir-trace.cpp
src/plugins/ctf/common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp
src/plugins/ctf/common/src/metadata/tsdl/decoder.cpp
src/plugins/ctf/common/src/metadata/tsdl/decoder.hpp
src/plugins/ctf/common/src/metadata/tsdl/visitor-generate-ir.cpp
src/plugins/ctf/fs-src/data-stream-file.cpp
src/plugins/ctf/fs-src/data-stream-file.hpp
src/plugins/ctf/fs-src/fs.cpp
src/plugins/ctf/fs-src/fs.hpp
src/plugins/ctf/fs-src/metadata.cpp [deleted file]
tests/plugins/src.ctf.fs/fail/test-fail.sh
tests/plugins/src.ctf.fs/test-deterministic-ordering.sh
tests/plugins/src.ctf.lttng-live/test-live.sh

index b785e3531faa4254522a29b06c1b74b95952d4cf..48d240955fdc0fe52ae243f76068cbf4d04fb667 100644 (file)
@@ -758,7 +758,6 @@ plugins_ctf_babeltrace_plugin_ctf_la_SOURCES = \
        plugins/ctf/fs-src/fs.cpp \
        plugins/ctf/fs-src/fs.hpp \
        plugins/ctf/fs-src/lttng-index.hpp \
-       plugins/ctf/fs-src/metadata.cpp \
        plugins/ctf/fs-src/metadata.hpp \
        plugins/ctf/fs-src/query.cpp \
        plugins/ctf/fs-src/query.hpp \
index 780fd0c5d8a073ede73e3803bfad4886d50fdd26..e72e89f65b062197eb3dbb7f68fb5e7e43e92c1e 100644 (file)
@@ -16,10 +16,12 @@ namespace src {
 
 MetadataStreamMajorVersion getMetadataStreamMajorVersion(const bt2c::ConstBytes buffer) noexcept
 {
-    BT_ASSERT(buffer.data());
+    {
+        BT_ASSERT(buffer.data());
 
-    /* CTF 2 if it starts with an RS byte, otherwise CTF 1 */
-    return (buffer[0] == 30) ? MetadataStreamMajorVersion::V2 : MetadataStreamMajorVersion::V1;
+        /* CTF 2 if it starts with an RS byte, otherwise CTF 1 */
+        return (buffer[0] == 30) ? MetadataStreamMajorVersion::V2 : MetadataStreamMajorVersion::V1;
+    }
 }
 
 std::unique_ptr<MetadataStreamParser>
index 6e9ab32f36619690c61f8f217418b3a968c81378..356d97192495a109734e293f218c8bfd4ce69613 100644 (file)
@@ -22,7 +22,6 @@
 #include "cpp-common/bt2c/logging.hpp"
 #include "cpp-common/vendor/fmt/format.h" /* IWYU pragma: keep */
 
-#include "../../clk-cls-cfg.hpp"
 #include "ctf-meta.hpp"
 
 // the parameter name (of the reentrant 'yyparse' function)
@@ -485,12 +484,10 @@ struct ctf_visitor_generate_ir
     using UP = std::unique_ptr<ctf_visitor_generate_ir>;
 
     explicit ctf_visitor_generate_ir(
-        const ctf::src::ClkClsCfg& clkClsCfgParam,
         const bt2::OptionalBorrowedObject<bt2::SelfComponent> selfCompParam,
         bt2c::Logger loggerParam) noexcept :
-
         logger {std::move(loggerParam)},
-        selfComp {selfCompParam}, clkClsCfg {clkClsCfgParam}
+        selfComp {selfCompParam}
     {
     }
 
@@ -514,13 +511,10 @@ struct ctf_visitor_generate_ir
 
     /* True if this is an LTTng trace */
     bool is_lttng = false;
-
-    ctf::src::ClkClsCfg clkClsCfg;
 };
 
 ctf_visitor_generate_ir::UP
-ctf_visitor_generate_ir_create(const ctf::src::ClkClsCfg& clkClsCfg,
-                               bt2::OptionalBorrowedObject<bt2::SelfComponent> selfComp,
+ctf_visitor_generate_ir_create(const bt2::OptionalBorrowedObject<bt2::SelfComponent> selfComp,
                                const bt2c::Logger& parentLogger);
 
 bt2::TraceClass::Shared
index 163c050afe1d23510e9341d5dcc6fd9ab8823747..84cd326fbb6173283eb3e38c9f87fd88d4789803 100644 (file)
@@ -694,7 +694,7 @@ Ctf1MetadataStreamParser::Ctf1MetadataStreamParser(
     const bt2c::Logger& parentLogger) :
     MetadataStreamParser {selfComp, clkClsCfg},
     _mLogger {parentLogger, "PLUGIN/CTF/CTF-1-META-STREAM-PARSER"},
-    _mOrigCtfIrGenerator {ctf_visitor_generate_ir_create(clkClsCfg, selfComp, _mLogger)},
+    _mOrigCtfIrGenerator {ctf_visitor_generate_ir_create({}, _mLogger)},
     _mScanner {ctf_scanner_alloc(_mLogger)}, _mStreamDecoder {_mLogger}
 {
 }
index dbcbbbd8b160f56915dc8b9f988bc8bc5735800a..fb4a1b41176d0fd2202ef8191240ec0487d3a107 100644 (file)
@@ -4,8 +4,11 @@
  * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
  */
 
+#include <cstdint>
+
 #include <babeltrace2/babeltrace.h>
 
+#include "cpp-common/bt2c/logging.hpp"
 #include "cpp-common/bt2c/uuid.hpp"
 
 #include "ctf-meta-configure-ir-trace.hpp"
@@ -37,3 +40,57 @@ void ctf_trace_class_configure_ir_trace(struct ctf_trace_class *tc, const bt2::T
         }
     }
 }
+
+void ctf_trace_class_configure_ir_trace(const ctf::src::TraceCls& tc, bt2::Trace irTrace,
+                                        const std::uint64_t mipVersion,
+                                        const bt2c::Logger& parentLogger)
+{
+    bt2c::Logger logger {parentLogger, "PLUGIN/CTF/META/CONFIG-IR-TRACE"};
+
+    if (tc.uid()) {
+        if (mipVersion == 0) {
+            /*
+             * CTF 2 isn't supported under MIP 0, therefore we expect
+             * `tc.uid()` to be a UUID string.
+             */
+            irTrace.uuid(bt2c::Uuid {*tc.uid()});
+        } else {
+            /* MIP ≥ 1: always a UID */
+            irTrace.uid(*tc.uid());
+        }
+    }
+
+    if (tc.env()) {
+        tc.env()->forEach([&irTrace, &logger](const char *name, bt2::ConstValue val) {
+            switch (val.type()) {
+            case bt2::ValueType::SignedInteger:
+                irTrace.environmentEntry(name, val.asSignedInteger().value());
+                break;
+
+            case bt2::ValueType::UnsignedInteger:
+            {
+                auto uval = val.asUnsignedInteger().value();
+
+                if (uval > std::numeric_limits<std::int64_t>::max()) {
+                    BT_CPPLOGW_SPEC(
+                        logger,
+                        "Cannot convert unsigned integer environment entry value to signed integer without overflowing. Skipping environment entry: "
+                        "entry-name=\"{}\", entry-value={}",
+                        name, uval);
+                    break;
+                }
+
+                irTrace.environmentEntry(name, static_cast<std::int64_t>(uval));
+                break;
+            }
+
+            case bt2::ValueType::String:
+                irTrace.environmentEntry(name, val.asString().value().data());
+                break;
+
+            default:
+                bt_common_abort();
+            }
+        });
+    }
+}
index 3ce3fa964679e9f64571f2936745c90122ce60ec..4befc071991ebb8edc1b8729f1324301b53161fb 100644 (file)
@@ -7,8 +7,19 @@
 #ifndef BABELTRACE_PLUGINS_CTF_COMMON_SRC_METADATA_TSDL_CTF_META_CONFIGURE_IR_TRACE_HPP
 #define BABELTRACE_PLUGINS_CTF_COMMON_SRC_METADATA_TSDL_CTF_META_CONFIGURE_IR_TRACE_HPP
 
-#include "cpp-common/bt2/trace-ir.hpp"
+#include <cstdint>
+
+#include "../ctf-ir.hpp"
+
+namespace bt2c {
+
+class Logger;
+
+} /* namespace bt2c */
 
 void ctf_trace_class_configure_ir_trace(struct ctf_trace_class *tc, bt2::Trace ir_trace);
 
+void ctf_trace_class_configure_ir_trace(const ctf::src::TraceCls& tc, bt2::Trace irTrace,
+                                        std::uint64_t mipVersion, const bt2c::Logger& parentLogger);
+
 #endif /* BABELTRACE_PLUGINS_CTF_COMMON_SRC_METADATA_TSDL_CTF_META_CONFIGURE_IR_TRACE_HPP */
index 54121dd23b197e4accb3d72f88b52745af2c4862..2aa7b24bdf779765e5475a6ee90fe111100ce60c 100644 (file)
@@ -90,11 +90,7 @@ ctf_metadata_decoder_up
 ctf_metadata_decoder_create(const struct ctf_metadata_decoder_config *config)
 {
     BT_ASSERT(config);
-    BT_CPPLOGD_SPEC(config->logger,
-                    "Creating CTF metadata decoder: "
-                    "clock-class-offset-s={}, "
-                    "clock-class-offset-ns={}",
-                    config->clkClsCfg.offsetSec, config->clkClsCfg.offsetNanoSec);
+    BT_CPPLOGD_SPEC(config->logger, "Creating CTF metadata decoder");
 
     ctf_metadata_decoder *mdec = new ctf_metadata_decoder {config->logger};
     mdec->scanner = ctf_scanner_alloc(mdec->logger);
@@ -117,8 +113,7 @@ ctf_metadata_decoder_create(const struct ctf_metadata_decoder_config *config)
 
     mdec->bo = -1;
     mdec->config = *config;
-    mdec->visitor =
-        ctf_visitor_generate_ir_create(config->clkClsCfg, config->self_comp, config->logger);
+    mdec->visitor = ctf_visitor_generate_ir_create(config->self_comp, config->logger);
     if (!mdec->visitor) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(mdec->logger,
                                      "Failed to create a CTF IR metadata AST visitor: "
@@ -127,11 +122,7 @@ ctf_metadata_decoder_create(const struct ctf_metadata_decoder_config *config)
         goto error;
     }
 
-    BT_CPPLOGD_SPEC(mdec->logger,
-                    "Creating CTF metadata decoder: "
-                    "clock-class-offset-s={}, "
-                    "clock-class-offset-ns={}, addr={}",
-                    config->clkClsCfg.offsetSec, config->clkClsCfg.offsetNanoSec, fmt::ptr(mdec));
+    BT_CPPLOGD_SPEC(mdec->logger, "Created CTF metadata decoder: addr={}", fmt::ptr(mdec));
     goto end;
 
 error:
index 503ff5db5f8ae57617808fe2cf4bcdb7fa24016f..339cc253e9a4d1a327e6a5d812a146137cf72d9b 100644 (file)
@@ -70,8 +70,6 @@ struct ctf_metadata_decoder_config
     /* Weak, used to create a bt_trace_class, if not nullptr. */
     bt_self_component *self_comp = nullptr;
 
-    ctf::src::ClkClsCfg clkClsCfg;
-
     /* True to create trace class objects */
     bool create_trace_class = false;
 
index 8e5cb296f8f03b46b61468a4919864806e2a74d0..7000e6345a097e5b8741ed003e8c81c3b7849277 100644 (file)
@@ -465,12 +465,10 @@ ctf_visitor_generate_ir::~ctf_visitor_generate_ir()
 }
 
 ctf_visitor_generate_ir::UP
-ctf_visitor_generate_ir_create(const ctf::src::ClkClsCfg& clkClsCfg,
-                               const bt2::OptionalBorrowedObject<bt2::SelfComponent> selfComp,
+ctf_visitor_generate_ir_create(const bt2::OptionalBorrowedObject<bt2::SelfComponent> selfComp,
                                const bt2c::Logger& logger)
 {
-    ctf_visitor_generate_ir::UP ctx =
-        bt2s::make_unique<ctf_visitor_generate_ir>(clkClsCfg, selfComp, logger);
+    ctf_visitor_generate_ir::UP ctx = bt2s::make_unique<ctf_visitor_generate_ir>(selfComp, logger);
 
     if (selfComp) {
         bt_trace_class *trace_class = bt_trace_class_create(selfComp->libObjPtr());
@@ -4164,91 +4162,6 @@ error:
     return ret;
 }
 
-static inline uint64_t cycles_from_ns(uint64_t frequency, uint64_t ns)
-{
-    uint64_t cycles;
-
-    /* 1GHz */
-    if (frequency == UINT64_C(1000000000)) {
-        cycles = ns;
-    } else {
-        cycles = (uint64_t) (((double) ns * (double) frequency) / 1e9);
-    }
-
-    return cycles;
-}
-
-static void apply_clock_class_is_absolute(struct ctf_visitor_generate_ir *ctx,
-                                          struct ctf_clock_class *clock)
-{
-    if (ctx->clkClsCfg.forceOriginIsUnixEpoch) {
-        clock->is_absolute = true;
-    }
-
-    return;
-}
-
-static void apply_clock_class_offset(struct ctf_visitor_generate_ir *ctx,
-                                     struct ctf_clock_class *clock)
-{
-    uint64_t freq;
-    int64_t offset_s_to_apply = ctx->clkClsCfg.offsetSec;
-    uint64_t offset_ns_to_apply;
-    int64_t cur_offset_s;
-    uint64_t cur_offset_cycles;
-    long long offsetSecLL;
-    unsigned long long offsetCyclesULL;
-
-    if (ctx->clkClsCfg.offsetSec == 0 && ctx->clkClsCfg.offsetNanoSec == 0) {
-        goto end;
-    }
-
-    /* Transfer nanoseconds to seconds as much as possible */
-    if (ctx->clkClsCfg.offsetNanoSec < 0) {
-        const int64_t abs_ns = -ctx->clkClsCfg.offsetNanoSec;
-        const int64_t abs_extra_s = abs_ns / INT64_C(1000000000) + 1;
-        const int64_t extra_s = -abs_extra_s;
-        const int64_t offset_ns = ctx->clkClsCfg.offsetNanoSec - (extra_s * INT64_C(1000000000));
-
-        BT_ASSERT(offset_ns > 0);
-        offset_ns_to_apply = (uint64_t) offset_ns;
-        offset_s_to_apply += extra_s;
-    } else {
-        const int64_t extra_s = ctx->clkClsCfg.offsetNanoSec / INT64_C(1000000000);
-        const int64_t offset_ns = ctx->clkClsCfg.offsetNanoSec - (extra_s * INT64_C(1000000000));
-
-        BT_ASSERT(offset_ns >= 0);
-        offset_ns_to_apply = (uint64_t) offset_ns;
-        offset_s_to_apply += extra_s;
-    }
-
-    freq = clock->frequency;
-    cur_offset_s = clock->offset_seconds;
-    cur_offset_cycles = clock->offset_cycles;
-
-    /* Apply offsets */
-    cur_offset_s += offset_s_to_apply;
-    cur_offset_cycles += cycles_from_ns(freq, offset_ns_to_apply);
-
-    /*
-     * Recalibrate offsets because the part in cycles can be greater
-     * than the frequency at this point.
-     */
-    {
-        offsetSecLL = cur_offset_s;
-        offsetCyclesULL = cur_offset_cycles;
-
-        const auto offsetParts = ctf::src::normalizeClkOffset(offsetSecLL, offsetCyclesULL, freq);
-
-        /* Set final offsets */
-        clock->offset_seconds = offsetParts.first;
-        clock->offset_cycles = offsetParts.second;
-    }
-
-end:
-    return;
-}
-
 static int visit_clock_decl(struct ctf_visitor_generate_ir *ctx, struct ctf_node *clock_node)
 {
     int ret = 0;
@@ -4319,8 +4232,6 @@ static int visit_clock_decl(struct ctf_visitor_generate_ir *ctx, struct ctf_node
         clock->offset_cycles = offsetCyclesULL;
     }
 
-    apply_clock_class_offset(ctx, clock);
-    apply_clock_class_is_absolute(ctx, clock);
     g_ptr_array_add(ctx->ctf_tc->clock_classes, clock);
     clock = NULL;
 
index d2f7e102eb85734881aa90dbc4ca9e899632e835..15820c1da800a93ba7c6f5bbe2ad1876dbc7bb9f 100644 (file)
@@ -9,6 +9,7 @@
 #include <glib.h>
 #include <stdint.h>
 #include <stdio.h>
+#include <sys/stat.h>
 
 #include "compat/endian.h" /* IWYU pragma: keep  */
 #include "compat/mman.h"   /* IWYU: pragma keep  */
 #include "cpp-common/bt2s/make-unique.hpp"
 #include "cpp-common/vendor/fmt/format.h"
 
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/pkt-props.hpp"
 #include "data-stream-file.hpp"
 #include "file.hpp"
-#include "fs.hpp"
 #include "lttng-index.hpp"
 
-static inline size_t remaining_mmap_bytes(struct ctf_fs_ds_file *ds_file)
+using namespace bt2c::literals::datalen;
+
+static bt2c::DataLen getFileSize(const char * const path, const bt2c::Logger& logger)
+{
+    struct stat st;
+    if (stat(path, &st) != 0) {
+        BT_CPPLOGE_ERRNO_APPEND_CAUSE_AND_THROW_SPEC(logger, bt2::Error,
+                                                     "Failed to stat stream file", "path={}", path);
+    }
+
+    return bt2c::DataLen::fromBytes(st.st_size);
+}
+
+ctf_fs_ds_file_info::ctf_fs_ds_file_info(std::string pathParam, const bt2c::Logger& parentLogger) :
+    logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS-FILE-INFO"}, path(std::move(pathParam)),
+    size(getFileSize(path.c_str(), logger))
 {
-    BT_ASSERT_DBG(ds_file->mmap_len >= ds_file->request_offset_in_mapping);
-    return ds_file->mmap_len - ds_file->request_offset_in_mapping;
 }
 
 /*
@@ -34,16 +47,26 @@ static inline size_t remaining_mmap_bytes(struct ctf_fs_ds_file *ds_file)
 
 static bool offset_ist_mapped(struct ctf_fs_ds_file *ds_file, off_t offset_in_file)
 {
+    if (!ds_file->mmap_addr)
+        return false;
+
     return offset_in_file >= ds_file->mmap_offset_in_file &&
            offset_in_file < (ds_file->mmap_offset_in_file + ds_file->mmap_len);
 }
 
-static enum ctf_msg_iter_medium_status ds_file_munmap(struct ctf_fs_ds_file *ds_file)
+enum ds_file_status
+{
+    DS_FILE_STATUS_OK = 0,
+    DS_FILE_STATUS_ERROR = -1,
+    DS_FILE_STATUS_EOF = 1,
+};
+
+static ds_file_status ds_file_munmap(struct ctf_fs_ds_file *ds_file)
 {
     BT_ASSERT(ds_file);
 
     if (!ds_file->mmap_addr) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+        return DS_FILE_STATUS_OK;
     }
 
     if (bt_munmap(ds_file->mmap_addr, ds_file->mmap_len)) {
@@ -51,13 +74,13 @@ static enum ctf_msg_iter_medium_status ds_file_munmap(struct ctf_fs_ds_file *ds_
                               ": address={}, size={}, file_path=\"{}\", file={}",
                               fmt::ptr(ds_file->mmap_addr), ds_file->mmap_len,
                               ds_file->file ? ds_file->file->path : "NULL",
-                              ds_file->file ? fmt::ptr(ds_file->file->fp) : NULL);
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+                              ds_file->file ? fmt::ptr(ds_file->file->fp) : nullptr);
+        return DS_FILE_STATUS_ERROR;
     }
 
     ds_file->mmap_addr = NULL;
 
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+    return DS_FILE_STATUS_OK;
 }
 
 /*
@@ -65,32 +88,25 @@ static enum ctf_msg_iter_medium_status ds_file_munmap(struct ctf_fs_ds_file *ds_
  * mapping.  If the currently mmap-ed region already contains
  * `requested_offset_in_file`, the mapping is kept.
  *
- * Set `ds_file->requested_offset_in_mapping` based on `request_offset_in_file`,
- * such that the next call to `request_bytes` will return bytes starting at that
- * position.
- *
  * `requested_offset_in_file` must be a valid offset in the file.
  */
-static enum ctf_msg_iter_medium_status ds_file_mmap(struct ctf_fs_ds_file *ds_file,
-                                                    off_t requested_offset_in_file)
+static ds_file_status ds_file_mmap(struct ctf_fs_ds_file *ds_file, off_t requested_offset_in_file)
 {
     /* Ensure the requested offset is in the file range. */
     BT_ASSERT(requested_offset_in_file >= 0);
     BT_ASSERT(requested_offset_in_file < ds_file->file->size);
 
     /*
-     * If the mapping already contains the requested offset, just adjust
-     * requested_offset_in_mapping.
+     * If the mapping already contains the requested range, we have nothing to
+     * do.
      */
     if (offset_ist_mapped(ds_file, requested_offset_in_file)) {
-        ds_file->request_offset_in_mapping =
-            requested_offset_in_file - ds_file->mmap_offset_in_file;
-        return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+        return DS_FILE_STATUS_OK;
     }
 
     /* Unmap old region */
-    ctf_msg_iter_medium_status status = ds_file_munmap(ds_file);
-    if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
+    ds_file_status status = ds_file_munmap(ds_file);
+    if (status != DS_FILE_STATUS_OK) {
         return status;
     }
 
@@ -98,14 +114,15 @@ static enum ctf_msg_iter_medium_status ds_file_mmap(struct ctf_fs_ds_file *ds_fi
      * Compute a mapping that has the required alignment properties and
      * contains `requested_offset_in_file`.
      */
-    ds_file->request_offset_in_mapping =
-        requested_offset_in_file %
-        bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
-    ds_file->mmap_offset_in_file = requested_offset_in_file - ds_file->request_offset_in_mapping;
+    size_t alignment = bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
+    ds_file->mmap_offset_in_file =
+        requested_offset_in_file - (requested_offset_in_file % alignment);
     ds_file->mmap_len =
         MIN(ds_file->file->size - ds_file->mmap_offset_in_file, ds_file->mmap_max_len);
 
     BT_ASSERT(ds_file->mmap_len > 0);
+    BT_ASSERT(requested_offset_in_file >= ds_file->mmap_offset_in_file);
+    BT_ASSERT(requested_offset_in_file < (ds_file->mmap_offset_in_file + ds_file->mmap_len));
 
     ds_file->mmap_addr =
         bt_mmap(ds_file->mmap_len, PROT_READ, MAP_PRIVATE, fileno(ds_file->file->fp.get()),
@@ -115,307 +132,45 @@ static enum ctf_msg_iter_medium_status ds_file_mmap(struct ctf_fs_ds_file *ds_fi
                         "Cannot memory-map address (size {}) of file \"{}\" ({}) at offset {}: {}",
                         ds_file->mmap_len, ds_file->file->path, fmt::ptr(ds_file->file->fp),
                         (intmax_t) ds_file->mmap_offset_in_file, strerror(errno));
-        return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
-    }
-
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
-
-/*
- * Change the mapping of the file to read the region that follows the current
- * mapping.
- *
- * If the file hasn't been mapped yet, then everything (mmap_offset_in_file,
- * mmap_len, request_offset_in_mapping) should have the value 0, which will
- * result in the beginning of the file getting mapped.
- *
- * return _EOF if the current mapping is the end of the file.
- */
-
-static enum ctf_msg_iter_medium_status ds_file_mmap_next(struct ctf_fs_ds_file *ds_file)
-{
-    /*
-     * If we're called, it's because more bytes are requested but we have
-     * given all the bytes of the current mapping.
-     */
-    BT_ASSERT(ds_file->request_offset_in_mapping == ds_file->mmap_len);
-
-    /*
-     * If the current mapping coincides with the end of the file, there is
-     * no next mapping.
-     */
-    if (ds_file->mmap_offset_in_file + ds_file->mmap_len == ds_file->file->size) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+        return DS_FILE_STATUS_ERROR;
     }
 
-    return ds_file_mmap(ds_file, ds_file->mmap_offset_in_file + ds_file->mmap_len);
+    return DS_FILE_STATUS_OK;
 }
 
-static enum ctf_msg_iter_medium_status medop_request_bytes(size_t request_sz, uint8_t **buffer_addr,
-                                                           size_t *buffer_sz, void *data)
+void ctf_fs_ds_index::updateOffsetsInStream()
 {
-    struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
-
-    BT_ASSERT(request_sz > 0);
+    auto offsetInStream = 0_bytes;
 
-    /*
-     * Check if we have at least one memory-mapped byte left. If we don't,
-     * mmap the next file.
-     */
-    if (remaining_mmap_bytes(ds_file) == 0) {
-        /* Are we at the end of the file? */
-        if (ds_file->mmap_offset_in_file >= ds_file->file->size) {
-            BT_CPPLOGD_SPEC(ds_file->logger, "Reached end of file \"{}\" ({})", ds_file->file->path,
-                            fmt::ptr(ds_file->file->fp));
-            return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
-        }
-
-        ctf_msg_iter_medium_status status = ds_file_mmap_next(ds_file);
-        switch (status) {
-        case CTF_MSG_ITER_MEDIUM_STATUS_OK:
-            break;
-        case CTF_MSG_ITER_MEDIUM_STATUS_EOF:
-            return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
-        default:
-            BT_CPPLOGE_SPEC(ds_file->logger, "Cannot memory-map next region of file \"{}\" ({})",
-                            ds_file->file->path, fmt::ptr(ds_file->file->fp));
-            return status;
-        }
+    for (ctf_fs_ds_index_entry& entry : this->entries) {
+        entry.offsetInStream = offsetInStream;
+        offsetInStream += entry.packetSize;
     }
-
-    BT_ASSERT(remaining_mmap_bytes(ds_file) > 0);
-    *buffer_sz = MIN(remaining_mmap_bytes(ds_file), request_sz);
-
-    BT_ASSERT(ds_file->mmap_addr);
-    *buffer_addr = ((uint8_t *) ds_file->mmap_addr) + ds_file->request_offset_in_mapping;
-
-    ds_file->request_offset_in_mapping += *buffer_sz;
-
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
-
-static bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t, void *data)
-{
-    struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
-    bt_stream_class *ds_file_stream_class;
-
-    ds_file_stream_class = ds_file->stream->cls().libObjPtr();
-
-    if (stream_class != ds_file_stream_class) {
-        /*
-         * Not supported: two packets described by two different
-         * stream classes within the same data stream file.
-         */
-        return nullptr;
-    }
-
-    return ds_file->stream->libObjPtr();
-}
-
-static enum ctf_msg_iter_medium_status medop_seek(off_t offset, void *data)
-{
-    struct ctf_fs_ds_file *ds_file = (struct ctf_fs_ds_file *) data;
-
-    BT_ASSERT(offset >= 0);
-    BT_ASSERT(offset < ds_file->file->size);
-
-    return ds_file_mmap(ds_file, offset);
-}
-
-struct ctf_msg_iter_medium_ops ctf_fs_ds_file_medops = {
-    medop_request_bytes,
-    medop_seek,
-    nullptr,
-    medop_borrow_stream,
-};
-
-struct ctf_fs_ds_group_medops_data
-{
-    explicit ctf_fs_ds_group_medops_data(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS-GROUP-MEDOPS"}
-    {
-    }
-
-    bt2c::Logger logger;
-
-    /* Weak, set once at creation time. */
-    struct ctf_fs_ds_file_group *ds_file_group = nullptr;
-
-    /*
-     * Index (as in element rank) of the index entry of ds_file_groups'
-     * index we will read next (so, the one after the one we are reading
-     * right now).
-     */
-    guint next_index_entry_index = 0;
-
-    /*
-     * File we are currently reading.  Changes whenever we switch to
-     * reading another data file.
-     */
-    ctf_fs_ds_file::UP file;
-
-    /* Weak, for context / logging / appending causes. */
-    bt_self_message_iterator *self_msg_iter = nullptr;
-};
-
-static enum ctf_msg_iter_medium_status medop_group_request_bytes(size_t request_sz,
-                                                                 uint8_t **buffer_addr,
-                                                                 size_t *buffer_sz, void *void_data)
-{
-    struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
-
-    /* Return bytes from the current file. */
-    return medop_request_bytes(request_sz, buffer_addr, buffer_sz, data->file.get());
-}
-
-static bt_stream *medop_group_borrow_stream(bt_stream_class *stream_class, int64_t stream_id,
-                                            void *void_data)
-{
-    struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
-
-    return medop_borrow_stream(stream_class, stream_id, data->file.get());
-}
-
-/*
- * Set `data->file` to prepare it to read the packet described
- * by `index_entry`.
- */
-
-static enum ctf_msg_iter_medium_status
-ctf_fs_ds_group_medops_set_file(struct ctf_fs_ds_group_medops_data *data,
-                                struct ctf_fs_ds_index_entry *index_entry)
-{
-    BT_ASSERT(data);
-    BT_ASSERT(index_entry);
-
-    /* Check if that file is already the one mapped. */
-    if (!data->file || data->file->file->path != index_entry->path) {
-        /* Create the new file. */
-        data->file =
-            ctf_fs_ds_file_create(data->ds_file_group->ctf_fs_trace, data->ds_file_group->stream,
-                                  index_entry->path, data->logger);
-        if (!data->file) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(data->logger, "failed to create ctf_fs_ds_file.");
-            return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
-        }
-    }
-
-    /*
-     * Ensure the right portion of the file will be returned on the next
-     * request_bytes call.
-     */
-    return ds_file_mmap(data->file.get(), index_entry->offset.bytes());
-}
-
-static enum ctf_msg_iter_medium_status medop_group_switch_packet(void *void_data)
-{
-    struct ctf_fs_ds_group_medops_data *data = (struct ctf_fs_ds_group_medops_data *) void_data;
-
-    /* If we have gone through all index entries, we are done. */
-    if (data->next_index_entry_index >= data->ds_file_group->index.entries.size()) {
-        return CTF_MSG_ITER_MEDIUM_STATUS_EOF;
-    }
-
-    /*
-     * Otherwise, look up the next index entry / packet and prepare it
-     *  for reading.
-     */
-    ctf_msg_iter_medium_status status = ctf_fs_ds_group_medops_set_file(
-        data, &data->ds_file_group->index.entries[data->next_index_entry_index]);
-    if (status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
-        return status;
-    }
-
-    data->next_index_entry_index++;
-
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
-
-void ctf_fs_ds_group_medops_data_deleter::operator()(ctf_fs_ds_group_medops_data *data) noexcept
-{
-    delete data;
 }
 
-enum ctf_msg_iter_medium_status ctf_fs_ds_group_medops_data_create(
-    struct ctf_fs_ds_file_group *ds_file_group, bt_self_message_iterator *self_msg_iter,
-    const bt2c::Logger& parentLogger, ctf_fs_ds_group_medops_data_up& out)
+static int convert_cycles_to_ns(const ctf::src::ClkCls& clockClass, uint64_t cycles, int64_t *ns)
 {
-    BT_ASSERT(self_msg_iter);
-    BT_ASSERT(ds_file_group);
-    BT_ASSERT(!ds_file_group->index.entries.empty());
-
-    out.reset(new ctf_fs_ds_group_medops_data {parentLogger});
-
-    out->ds_file_group = ds_file_group;
-    out->self_msg_iter = self_msg_iter;
-
-    /*
-     * No need to prepare the first file.  ctf_msg_iter will call
-     * switch_packet before reading the first packet, it will be
-     * done then.
-     */
-
-    return CTF_MSG_ITER_MEDIUM_STATUS_OK;
-}
-
-void ctf_fs_ds_group_medops_data_reset(struct ctf_fs_ds_group_medops_data *data)
-{
-    data->next_index_entry_index = 0;
-}
-
-struct ctf_msg_iter_medium_ops ctf_fs_ds_group_medops = {
-    .request_bytes = medop_group_request_bytes,
-
-    /*
-     * We don't support seeking using this medops.  It would probably be
-     * possible, but it's not needed at the moment.
-     */
-    .seek = NULL,
-
-    .switch_packet = medop_group_switch_packet,
-    .borrow_stream = medop_group_borrow_stream,
-};
-
-static int convert_cycles_to_ns(struct ctf_clock_class *clock_class, uint64_t cycles, int64_t *ns)
-{
-    return bt_util_clock_cycles_to_ns_from_origin(cycles, clock_class->frequency,
-                                                  clock_class->offset_seconds,
-                                                  clock_class->offset_cycles, ns);
+    return bt_util_clock_cycles_to_ns_from_origin(cycles, clockClass.freq(),
+                                                  clockClass.offsetFromOrigin().seconds(),
+                                                  clockClass.offsetFromOrigin().cycles(), ns);
 }
 
 static bt2s::optional<ctf_fs_ds_index>
-build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_info *file_info,
-                          struct ctf_msg_iter *msg_iter)
+build_index_from_idx_file(const ctf_fs_ds_file_info& fileInfo, const ctf::src::TraceCls& traceCls)
 {
-    BT_CPPLOGI_SPEC(ds_file->logger, "Building index from .idx file of stream file {}",
-                    ds_file->file->path);
-    ctf_msg_iter_packet_properties props;
-    int ret = ctf_msg_iter_get_packet_properties(msg_iter, &props);
-    if (ret) {
-        BT_CPPLOGI_SPEC(ds_file->logger, "Cannot read first packet's header and context fields.");
-        return bt2s::nullopt;
-    }
-
-    ctf_stream_class *sc =
-        ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props.stream_class_id);
-    BT_ASSERT(sc);
-    if (!sc->default_clock_class) {
-        BT_CPPLOGI_SPEC(ds_file->logger, "Cannot find stream class's default clock class.");
-        return bt2s::nullopt;
-    }
+    const char *path = fileInfo.path.c_str();
+    BT_CPPLOGI_SPEC(fileInfo.logger, "Building index from .idx file of stream file {}", path);
 
     /* Look for index file in relative path index/name.idx. */
-    bt2c::GCharUP basename {g_path_get_basename(ds_file->file->path.c_str())};
+    bt2c::GCharUP basename {g_path_get_basename(path)};
     if (!basename) {
-        BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get the basename of datastream file {}",
-                        ds_file->file->path);
+        BT_CPPLOGE_SPEC(fileInfo.logger, "Cannot get the basename of datastream file {}", path);
         return bt2s::nullopt;
     }
 
-    bt2c::GCharUP directory {g_path_get_dirname(ds_file->file->path.c_str())};
+    bt2c::GCharUP directory {g_path_get_dirname(path)};
     if (!directory) {
-        BT_CPPLOGE_SPEC(ds_file->logger, "Cannot get dirname of datastream file {}",
-                        ds_file->file->path);
+        BT_CPPLOGE_SPEC(fileInfo.logger, "Cannot get dirname of datastream file {}", path);
         return bt2s::nullopt;
     }
 
@@ -424,7 +179,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
         g_build_filename(directory.get(), "index", index_basename.c_str(), NULL)};
     bt2c::GMappedFileUP mapped_file {g_mapped_file_new(index_file_path.get(), FALSE, NULL)};
     if (!mapped_file) {
-        BT_CPPLOGD_SPEC(ds_file->logger, "Cannot create new mapped file {}", index_file_path.get());
+        BT_CPPLOGD_SPEC(fileInfo.logger, "Cannot create new mapped file {}", index_file_path.get());
         return bt2s::nullopt;
     }
 
@@ -435,7 +190,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
      */
     gsize filesize = g_mapped_file_get_length(mapped_file.get());
     if (filesize < sizeof(ctf_packet_index_file_hdr)) {
-        BT_CPPLOGW_SPEC(ds_file->logger,
+        BT_CPPLOGW_SPEC(fileInfo.logger,
                         "Invalid LTTng trace index file: "
                         "file size ({} bytes) < header size ({} bytes)",
                         filesize, sizeof(ctf_packet_index_file_hdr));
@@ -447,7 +202,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
 
     const char *file_pos = g_mapped_file_get_contents(mapped_file.get()) + sizeof(*header);
     if (be32toh(header->magic) != CTF_INDEX_MAGIC) {
-        BT_CPPLOGW_SPEC(ds_file->logger,
+        BT_CPPLOGW_SPEC(fileInfo.logger,
                         "Invalid LTTng trace index: \"magic\" field validation failed");
         return bt2s::nullopt;
     }
@@ -455,7 +210,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
     uint32_t version_major = be32toh(header->index_major);
     uint32_t version_minor = be32toh(header->index_minor);
     if (version_major != 1) {
-        BT_CPPLOGW_SPEC(ds_file->logger, "Unknown LTTng trace index version: major={}, minor={}",
+        BT_CPPLOGW_SPEC(fileInfo.logger, "Unknown LTTng trace index version: major={}, minor={}",
                         version_major, version_minor);
         return bt2s::nullopt;
     }
@@ -463,7 +218,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
     size_t file_index_entry_size = be32toh(header->packet_index_len);
     if (file_index_entry_size < CTF_INDEX_1_0_SIZE) {
         BT_CPPLOGW_SPEC(
-            ds_file->logger,
+            fileInfo.logger,
             "Invalid `packet_index_len` in LTTng trace index file (`packet_index_len` < CTF index 1.0 index entry size): "
             "packet_index_len={}, CTF_INDEX_1_0_SIZE={}",
             file_index_entry_size, CTF_INDEX_1_0_SIZE);
@@ -472,7 +227,7 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
 
     size_t file_entry_count = (filesize - sizeof(*header)) / file_index_entry_size;
     if ((filesize - sizeof(*header)) % file_index_entry_size) {
-        BT_CPPLOGW_SPEC(ds_file->logger,
+        BT_CPPLOGW_SPEC(fileInfo.logger,
                         "Invalid LTTng trace index: the index's size after the header "
                         "({} bytes) is not a multiple of the index entry size "
                         "({} bytes)",
@@ -480,37 +235,59 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
         return bt2s::nullopt;
     }
 
+    /*
+     * We need the clock class to convert cycles to ns.  For that, we need the
+     * stream class.  Read the stream class id from the first packet's header.
+     * We don't know the size of that packet yet, so pretend that it spans the
+     * whole file (the reader will only read the header anyway).
+     */
+    ctf_fs_ds_index_entry tempIndexEntry {path, 0_bits, fileInfo.size};
+    ctf_fs_ds_index tempIndex;
+    tempIndex.entries.emplace_back(tempIndexEntry);
+
+    ctf::src::fs::Medium::UP medium =
+        bt2s::make_unique<ctf::src::fs::Medium>(tempIndex, fileInfo.logger);
+    ctf::src::PktProps props =
+        ctf::src::readPktProps(traceCls, std::move(medium), 0_bytes, fileInfo.logger);
+
+    const ctf::src::DataStreamCls *sc = props.dataStreamCls;
+    BT_ASSERT(sc);
+    if (!sc->defClkCls()) {
+        BT_CPPLOGI_SPEC(fileInfo.logger, "Cannot find stream class's default clock class.");
+        return bt2s::nullopt;
+    }
+
     ctf_fs_ds_index index;
     ctf_fs_ds_index_entry *prev_index_entry = nullptr;
-    auto totalPacketsSize = bt2c::DataLen::fromBytes(0);
+    auto totalPacketsSize = 0_bytes;
 
     for (size_t i = 0; i < file_entry_count; i++) {
         struct ctf_packet_index *file_index = (struct ctf_packet_index *) file_pos;
         const auto packetSize = bt2c::DataLen::fromBits(be64toh(file_index->packet_size));
 
         if (packetSize.hasExtraBits()) {
-            BT_CPPLOGW_SPEC(ds_file->logger,
+            BT_CPPLOGW_SPEC(fileInfo.logger,
                             "Invalid packet size encountered in LTTng trace index file");
             return bt2s::nullopt;
         }
 
         const auto offset = bt2c::DataLen::fromBytes(be64toh(file_index->offset));
 
-        if (i != 0 && offset < prev_index_entry->offset) {
+        if (i != 0 && offset < prev_index_entry->offsetInFile) {
             BT_CPPLOGW_SPEC(
-                ds_file->logger,
+                fileInfo.logger,
                 "Invalid, non-monotonic, packet offset encountered in LTTng trace index file: "
                 "previous offset={} bytes, current offset={} bytes",
-                prev_index_entry->offset.bytes(), offset.bytes());
+                prev_index_entry->offsetInFile.bytes(), offset.bytes());
             return bt2s::nullopt;
         }
 
-        ctf_fs_ds_index_entry index_entry {file_info->path.c_str(), offset, packetSize};
+        ctf_fs_ds_index_entry index_entry {path, offset, packetSize};
         index_entry.timestamp_begin = be64toh(file_index->timestamp_begin);
         index_entry.timestamp_end = be64toh(file_index->timestamp_end);
         if (index_entry.timestamp_end < index_entry.timestamp_begin) {
             BT_CPPLOGW_SPEC(
-                ds_file->logger,
+                fileInfo.logger,
                 "Invalid packet time bounds encountered in LTTng trace index file (begin > end): "
                 "timestamp_begin={}, timestamp_end={}",
                 index_entry.timestamp_begin, index_entry.timestamp_end);
@@ -518,19 +295,19 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
         }
 
         /* Convert the packet's bound to nanoseconds since Epoch. */
-        ret = convert_cycles_to_ns(sc->default_clock_class, index_entry.timestamp_begin,
-                                   &index_entry.timestamp_begin_ns);
+        int ret = convert_cycles_to_ns(*sc->defClkCls(), index_entry.timestamp_begin,
+                                       &index_entry.timestamp_begin_ns);
         if (ret) {
             BT_CPPLOGI_SPEC(
-                ds_file->logger,
+                fileInfo.logger,
                 "Failed to convert raw timestamp to nanoseconds since Epoch during index parsing");
             return bt2s::nullopt;
         }
-        ret = convert_cycles_to_ns(sc->default_clock_class, index_entry.timestamp_end,
+        ret = convert_cycles_to_ns(*sc->defClkCls(), index_entry.timestamp_end,
                                    &index_entry.timestamp_end_ns);
         if (ret) {
             BT_CPPLOGI_SPEC(
-                ds_file->logger,
+                fileInfo.logger,
                 "Failed to convert raw timestamp to nanoseconds since Epoch during LTTng trace index parsing");
             return bt2s::nullopt;
         }
@@ -548,33 +325,29 @@ build_index_from_idx_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_
     }
 
     /* Validate that the index addresses the complete stream. */
-    if (ds_file->file->size != totalPacketsSize.bytes()) {
-        BT_CPPLOGW_SPEC(ds_file->logger,
+    if (fileInfo.size != totalPacketsSize) {
+        BT_CPPLOGW_SPEC(fileInfo.logger,
                         "Invalid LTTng trace index file; indexed size != stream file size: "
-                        "file-size={} bytes, total-packets-size={} bytes",
-                        ds_file->file->size, totalPacketsSize.bytes());
+                        "stream-file-size-bytes={}, total-packets-size-bytes={}",
+                        fileInfo.size.bytes(), totalPacketsSize.bytes());
         return bt2s::nullopt;
     }
 
     return index;
 }
 
-static int init_index_entry(ctf_fs_ds_index_entry& entry, struct ctf_fs_ds_file *ds_file,
-                            struct ctf_msg_iter_packet_properties *props)
+static int init_index_entry(ctf_fs_ds_index_entry& entry, ctf::src::PktProps *props,
+                            const ctf::src::DataStreamCls& dataStreamCls,
+                            const bt2c::Logger& logger)
 {
-    ctf_stream_class *sc =
-        ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props->stream_class_id);
-    BT_ASSERT(sc);
-
-    if (props->snapshots.beginning_clock != UINT64_C(-1)) {
-        entry.timestamp_begin = props->snapshots.beginning_clock;
+    if (props->snapshots.beginDefClk) {
+        entry.timestamp_begin = *props->snapshots.beginDefClk;
 
         /* Convert the packet's bound to nanoseconds since Epoch. */
-        int ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.beginning_clock,
+        int ret = convert_cycles_to_ns(*dataStreamCls.defClkCls(), *props->snapshots.beginDefClk,
                                        &entry.timestamp_begin_ns);
         if (ret) {
-            BT_CPPLOGI_SPEC(ds_file->logger,
-                            "Failed to convert raw timestamp to nanoseconds since Epoch.");
+            BT_CPPLOGI_SPEC(logger, "Failed to convert raw timestamp to nanoseconds since Epoch.");
             return ret;
         }
     } else {
@@ -582,15 +355,14 @@ static int init_index_entry(ctf_fs_ds_index_entry& entry, struct ctf_fs_ds_file
         entry.timestamp_begin_ns = UINT64_C(-1);
     }
 
-    if (props->snapshots.end_clock != UINT64_C(-1)) {
-        entry.timestamp_end = props->snapshots.end_clock;
+    if (props->snapshots.endDefClk) {
+        entry.timestamp_end = *props->snapshots.endDefClk;
 
         /* Convert the packet's bound to nanoseconds since Epoch. */
-        int ret = convert_cycles_to_ns(sc->default_clock_class, props->snapshots.end_clock,
+        int ret = convert_cycles_to_ns(*dataStreamCls.defClkCls(), *props->snapshots.endDefClk,
                                        &entry.timestamp_end_ns);
         if (ret) {
-            BT_CPPLOGI_SPEC(ds_file->logger,
-                            "Failed to convert raw timestamp to nanoseconds since Epoch.");
+            BT_CPPLOGI_SPEC(logger, "Failed to convert raw timestamp to nanoseconds since Epoch.");
             return ret;
         }
     } else {
@@ -602,66 +374,75 @@ static int init_index_entry(ctf_fs_ds_index_entry& entry, struct ctf_fs_ds_file
 }
 
 static bt2s::optional<ctf_fs_ds_index>
-build_index_from_stream_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_file_info *file_info,
-                             struct ctf_msg_iter *msg_iter)
+build_index_from_stream_file(const ctf_fs_ds_file_info& fileInfo,
+                             const ctf::src::TraceCls& traceCls)
 {
-    BT_CPPLOGI_SPEC(ds_file->logger, "Indexing stream file {}", ds_file->file->path);
+    const char *path = fileInfo.path.c_str();
+
+    BT_CPPLOGI_SPEC(fileInfo.logger, "Indexing stream file {}", path);
 
     ctf_fs_ds_index index;
-    auto currentPacketOffset = bt2c::DataLen::fromBytes(0);
+    auto currentPacketOffset = 0_bytes;
 
     while (true) {
-        struct ctf_msg_iter_packet_properties props;
-
-        if (currentPacketOffset.bytes() > ds_file->file->size) {
-            BT_CPPLOGE_SPEC(ds_file->logger,
+        if (currentPacketOffset > fileInfo.size) {
+            BT_CPPLOGE_SPEC(fileInfo.logger,
                             "Unexpected current packet's offset (larger than file).");
             return bt2s::nullopt;
-        } else if (currentPacketOffset.bytes() == ds_file->file->size) {
+        } else if (currentPacketOffset == fileInfo.size) {
             /* No more data */
             break;
         }
 
-        ctf_msg_iter_status iter_status = ctf_msg_iter_seek(msg_iter, currentPacketOffset.bytes());
-        if (iter_status != CTF_MSG_ITER_STATUS_OK) {
-            return bt2s::nullopt;
-        }
-
-        iter_status = ctf_msg_iter_get_packet_properties(msg_iter, &props);
-        if (iter_status != CTF_MSG_ITER_STATUS_OK) {
-            return bt2s::nullopt;
-        }
+        /*
+         * Create a temporary index and medium to read the properties of the
+         * current packet.  We don't know yet the size of the packet (that's
+         * one of the things we want to find out), so pretend it spans the rest
+         * of the file.
+         */
+        ctf_fs_ds_index_entry tempIndexEntry {path, currentPacketOffset,
+                                              fileInfo.size - currentPacketOffset};
+        ctf_fs_ds_index tempIndex;
+        tempIndex.entries.emplace_back(tempIndexEntry);
+        ctf::src::fs::Medium::UP medium =
+            bt2s::make_unique<ctf::src::fs::Medium>(tempIndex, fileInfo.logger);
+        ctf::src::PktProps props = ctf::src::readPktProps(traceCls, std::move(medium),
+                                                          currentPacketOffset, fileInfo.logger);
 
         /*
          * Get the current packet size from the packet header, if set.  Else,
          * assume there is a single packet in the file, so take the file size
          * as the packet size.
          */
-        const auto currentPacketSize = props.exp_packet_total_size >= 0 ?
-                                           bt2c::DataLen::fromBits(props.exp_packet_total_size) :
-                                           bt2c::DataLen::fromBytes(ds_file->file->size);
+        const auto currentPacketSize =
+            props.expectedTotalLen ? *props.expectedTotalLen : fileInfo.size;
+
+        BT_CPPLOGI_SPEC(fileInfo.logger,
+                        "Packet: offset-bytes={}, len-bytes={}, begin-clk={}, end-clk={}",
+                        currentPacketOffset.bytes(), currentPacketSize.bytes(),
+                        props.snapshots.beginDefClk ? *props.snapshots.beginDefClk : -1,
+                        props.snapshots.endDefClk ? *props.snapshots.endDefClk : -1);
 
-        if ((currentPacketOffset + currentPacketSize).bytes() > ds_file->file->size) {
-            BT_CPPLOGW_SPEC(ds_file->logger,
+        if (currentPacketOffset + currentPacketSize > fileInfo.size) {
+            BT_CPPLOGW_SPEC(fileInfo.logger,
                             "Invalid packet size reported in file: stream=\"{}\", "
                             "packet-offset-bytes={}, packet-size-bytes={}, "
                             "file-size-bytes={}",
-                            ds_file->file->path, currentPacketOffset.bytes(),
-                            currentPacketSize.bytes(), ds_file->file->size);
+                            path, currentPacketOffset.bytes(), currentPacketSize.bytes(),
+                            fileInfo.size.bytes());
             return bt2s::nullopt;
         }
 
-        ctf_fs_ds_index_entry index_entry {file_info->path, currentPacketOffset, currentPacketSize};
+        ctf_fs_ds_index_entry index_entry {path, currentPacketOffset, currentPacketSize};
 
-        int ret = init_index_entry(index_entry, ds_file, &props);
-        if (ret) {
+        if (init_index_entry(index_entry, &props, *props.dataStreamCls, fileInfo.logger)) {
             return bt2s::nullopt;
         }
 
         index.entries.emplace_back(index_entry);
 
         currentPacketOffset += currentPacketSize;
-        BT_CPPLOGD_SPEC(ds_file->logger,
+        BT_CPPLOGD_SPEC(fileInfo.logger,
                         "Seeking to next packet: current-packet-offset-bytes={}, "
                         "next-packet-offset-bytes={}",
                         (currentPacketOffset - currentPacketSize).bytes(),
@@ -671,40 +452,167 @@ build_index_from_stream_file(struct ctf_fs_ds_file *ds_file, struct ctf_fs_ds_fi
     return index;
 }
 
-ctf_fs_ds_file::UP ctf_fs_ds_file_create(struct ctf_fs_trace *ctf_fs_trace,
-                                         bt2::Stream::Shared stream, const char *path,
-                                         const bt2c::Logger& parentLogger)
+ctf_fs_ds_file::UP ctf_fs_ds_file_create(const char *path, const bt2c::Logger& parentLogger)
 {
-    auto ds_file = bt2s::make_unique<ctf_fs_ds_file>(parentLogger);
+    const auto offset_align = bt_mmap_get_offset_align_size(static_cast<int>(parentLogger.level()));
+    auto ds_file = bt2s::make_unique<ctf_fs_ds_file>(parentLogger, offset_align * 2048);
 
-    ds_file->file = bt2s::make_unique<ctf_fs_file>(parentLogger);
-    ds_file->stream = std::move(stream);
-    ds_file->metadata = ctf_fs_trace->metadata.get();
+    ds_file->file = bt2s::make_unique<ctf_fs_file>(ds_file->logger);
     ds_file->file->path = path;
     int ret = ctf_fs_file_open(ds_file->file.get(), "rb");
     if (ret) {
         return nullptr;
     }
 
-    const size_t offset_align =
-        bt_mmap_get_offset_align_size(static_cast<int>(ds_file->logger.level()));
-    ds_file->mmap_max_len = offset_align * 2048;
-
     return ds_file;
 }
 
-bt2s::optional<ctf_fs_ds_index> ctf_fs_ds_file_build_index(struct ctf_fs_ds_file *ds_file,
-                                                           struct ctf_fs_ds_file_info *file_info,
-                                                           struct ctf_msg_iter *msg_iter)
+namespace ctf {
+namespace src {
+namespace fs {
+
+Medium::Medium(const ctf_fs_ds_index& index, const bt2c::Logger& parentLogger) :
+    _mIndex(index), _mLogger {parentLogger, "PLUGIN/SRC.CTF.FS/DS-MEDIUM"}
 {
-    auto index = build_index_from_idx_file(ds_file, file_info, msg_iter);
+    BT_ASSERT(!_mIndex.entries.empty());
+}
+
+ctf_fs_ds_index::EntriesT::const_iterator
+Medium::_mFindIndexEntryForOffset(bt2c::DataLen offsetInStream) const noexcept
+{
+    return std::lower_bound(
+        _mIndex.entries.begin(), _mIndex.entries.end(), offsetInStream,
+        [](const ctf_fs_ds_index_entry& entry, bt2c::DataLen offsetInStreamLambda) {
+            return (entry.offsetInStream + entry.packetSize - 1_bytes) < offsetInStreamLambda;
+        });
+}
+
+ctf::src::Buf Medium::buf(const bt2c::DataLen requestedOffsetInStream, const bt2c::DataLen minSize)
+{
+    BT_CPPLOGD("buf called: offset-bytes={}, min-size-bytes={}", requestedOffsetInStream.bytes(),
+               minSize.bytes());
+
+    /* The medium only gets asked about whole byte offsets and min sizes. */
+    BT_ASSERT_DBG(requestedOffsetInStream.extraBitCount() == 0);
+    BT_ASSERT_DBG(minSize.extraBitCount() == 0);
+
+    /*
+     *  +-file 1-----+  +-file 2-----+------------+------------+
+     *  |            |  |            |            |            |
+     *  | packet 1   |  | packet 2   | packet 3   | packet 4   |
+     *  |            |  |            |            |            |
+     *  +------------+  +------------+------v-----+------------+
+     *  ^-----------------------------------^               requestedOffsetInStream (passed as parameter)
+     *  ^----------------------------^                      indexEntry.offsetInStream (known)
+     *                  ^------------^                      indexEntry.offsetInFile (known)
+     *  ^---------------^                                   fileStartInStream (computed)
+     *                  ^-------------------^               requestedOffsetInFile (computed)
+     *
+     * Then, assuming mmap maps this region:
+     *                                      ^------------^
+     *
+     *                  ^-------------------^               startOfMappingInFile
+     *                  ^----------------^                  requestedOffsetInMapping
+     *                  ^--------------------------------^  endOfMappingInFile
+     */
+    const ctf_fs_ds_index::EntriesT::const_iterator indexEntryIt =
+        this->_mFindIndexEntryForOffset(requestedOffsetInStream);
+    if (indexEntryIt == _mIndex.entries.end()) {
+        BT_CPPLOGD("no index entry containing this offset");
+        throw NoData();
+    }
+
+    const ctf_fs_ds_index_entry& indexEntry = *indexEntryIt;
+
+    _mCurrentDsFile = ctf_fs_ds_file_create(indexEntry.path, _mLogger);
+    if (!_mCurrentDsFile) {
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW(bt2::Error, "Failed to create ctf_fs_ds_file");
+    }
+
+    const auto fileStartInStream = indexEntry.offsetInStream - indexEntry.offsetInFile;
+    const auto requestedOffsetInFile = requestedOffsetInStream - fileStartInStream;
+
+    ds_file_status status = ds_file_mmap(_mCurrentDsFile.get(), requestedOffsetInFile.bytes());
+    if (status != DS_FILE_STATUS_OK) {
+        throw bt2::Error("Failed to mmap file");
+    }
+
+    const auto startOfMappingInFile =
+        bt2c::DataLen::fromBytes(_mCurrentDsFile->mmap_offset_in_file);
+    const auto requestedOffsetInMapping = requestedOffsetInFile - startOfMappingInFile;
+    const auto exclEndOfMappingInFile =
+        startOfMappingInFile + bt2c::DataLen::fromBytes(_mCurrentDsFile->mmap_len);
+
+    /*
+     * Find where to end the mapping.  We can map the following entries as long as
+     *
+     *  1) there are following entries
+     *  2) they are located in the same file as our starting entry
+     *  3) they are (at least partially) within the mapping
+     */
+
+    ctf_fs_ds_index::EntriesT::const_iterator endIndexEntryIt = indexEntryIt;
+    while (true) {
+        ctf_fs_ds_index::EntriesT::const_iterator nextIndexEntryIt = endIndexEntryIt + 1;
+
+        if (nextIndexEntryIt == _mIndex.entries.end()) {
+            break;
+        }
+
+        if (nextIndexEntryIt->path != indexEntryIt->path) {
+            break;
+        }
+
+        if (nextIndexEntryIt->offsetInFile >= exclEndOfMappingInFile) {
+            break;
+        }
+
+        endIndexEntryIt = nextIndexEntryIt;
+    }
+
+    /*
+     * It's possible the mapping ends in the middle of our end entry.  Choose
+     * the end of the mapping or the end of the end entry, whichever comes
+     * first, as the end of the returned buffer.
+     */
+    const auto exclEndOfEndEntryInFile =
+        endIndexEntryIt->offsetInFile + endIndexEntryIt->packetSize;
+    const auto bufEndInFile = std::min(exclEndOfMappingInFile, exclEndOfEndEntryInFile);
+    const auto bufLen = bufEndInFile - requestedOffsetInFile;
+    const uint8_t *bufStart =
+        (const uint8_t *) _mCurrentDsFile->mmap_addr + requestedOffsetInMapping.bytes();
+
+    if (bufLen < minSize) {
+        BT_CPPLOGE_APPEND_CAUSE_AND_THROW(
+            bt2::Error,
+            "Insufficient data in file to fulfill request: path=\"{}\", requested-offset-in-file-bytes={}, "
+            "remaining-data-len-in-file-bytes={}, min-size-bytes={}",
+            indexEntry.path, requestedOffsetInFile.bytes(), bufLen.bytes(), minSize.bytes());
+    }
+
+    ctf::src::Buf buf {bufStart, bufLen};
+
+    BT_CPPLOGD("CtfFsMedium::buf returns: buf-addr={}, buf-size-bytes={}\n", fmt::ptr(buf.addr()),
+               buf.size().bytes());
+
+    return buf;
+}
+
+} /* namespace fs */
+} /* namespace src */
+} /* namespace ctf */
+
+bt2s::optional<ctf_fs_ds_index> ctf_fs_ds_file_build_index(const ctf_fs_ds_file_info& fileInfo,
+                                                           const ctf::src::TraceCls& traceCls)
+{
+    auto index = build_index_from_idx_file(fileInfo, traceCls);
     if (index) {
         return index;
     }
 
-    BT_CPPLOGI_SPEC(ds_file->logger, "Failed to build index from .index file; "
+    BT_CPPLOGI_SPEC(fileInfo.logger, "Failed to build index from .index file; "
                                      "falling back to stream indexing.");
-    return build_index_from_stream_file(ds_file, file_info, msg_iter);
+    return build_index_from_stream_file(fileInfo, traceCls);
 }
 
 ctf_fs_ds_file::~ctf_fs_ds_file()
@@ -712,15 +620,6 @@ ctf_fs_ds_file::~ctf_fs_ds_file()
     (void) ds_file_munmap(this);
 }
 
-ctf_fs_ds_file_info::UP ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns)
-{
-    ctf_fs_ds_file_info::UP ds_file_info = bt2s::make_unique<ctf_fs_ds_file_info>();
-
-    ds_file_info->path = path;
-    ds_file_info->begin_ns = begin_ns;
-    return ds_file_info;
-}
-
 void ctf_fs_ds_file_group::insert_ds_file_info_sorted(ctf_fs_ds_file_info::UP ds_file_info)
 {
     /* Find the spot where to insert this ds_file_info. */
index 9a981f86acfc940ba1135a54b4204edc928a9cd1..db18184a00d175fffeab0882b770561144e7ff83 100644 (file)
 #include "cpp-common/bt2c/data-len.hpp"
 #include "cpp-common/bt2c/logging.hpp"
 
-#include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/item-seq/medium.hpp"
+#include "../common/src/metadata/ctf-ir.hpp"
 #include "file.hpp"
 
 struct ctf_fs_ds_file_info
 {
     using UP = std::unique_ptr<ctf_fs_ds_file_info>;
 
+    ctf_fs_ds_file_info(std::string pathParam, const bt2c::Logger& parentLogger);
+
+    bt2c::Logger logger;
     std::string path;
+    bt2c::DataLen size;
 
     /* Guaranteed to be set, as opposed to the index. */
     int64_t begin_ns = 0;
@@ -37,8 +42,8 @@ struct ctf_fs_ds_file
 {
     using UP = std::unique_ptr<ctf_fs_ds_file>;
 
-    explicit ctf_fs_ds_file(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS"}
+    explicit ctf_fs_ds_file(const bt2c::Logger& parentLogger, const size_t mmapMaxLenParam) :
+        logger {parentLogger, "PLUGIN/SRC.CTF.FS/DS"}, mmap_max_len {mmapMaxLenParam}
     {
     }
 
@@ -48,13 +53,8 @@ struct ctf_fs_ds_file
 
     bt2c::Logger logger;
 
-    /* Weak */
-    struct ctf_fs_metadata *metadata = nullptr;
-
     ctf_fs_file::UP file;
 
-    bt2::Stream::Shared stream;
-
     void *mmap_addr = nullptr;
 
     /*
@@ -68,28 +68,31 @@ struct ctf_fs_ds_file
 
     /* Offset in the file where the current mapping starts. */
     off_t mmap_offset_in_file = 0;
-
-    /*
-     * Offset, in the current mapping, of the address to return on the next
-     * request.
-     */
-    off_t request_offset_in_mapping = 0;
 };
 
 struct ctf_fs_ds_index_entry
 {
-    ctf_fs_ds_index_entry(const bt2c::CStringView pathParam, const bt2c::DataLen offsetParam,
-                          const bt2c::DataLen packetSizeParam) noexcept :
+    ctf_fs_ds_index_entry(const bt2c::CStringView pathParam, const bt2c::DataLen offsetInFileParam,
+                          const bt2c::DataLen packetSizeParam) :
         path {pathParam},
-        offset {offsetParam}, packetSize {packetSizeParam}
+        offsetInFile {offsetInFileParam}, offsetInStream {offsetInFileParam},
+        packetSize {packetSizeParam}
     {
+        BT_ASSERT(path);
     }
 
     /* Weak, belongs to ctf_fs_ds_file_info. */
     const char *path;
 
     /* Position of the packet from the beginning of the file. */
-    bt2c::DataLen offset;
+    bt2c::DataLen offsetInFile;
+
+    /*
+     * Position of the packet from the beginning of the stream.  Starts equal
+     * to `offsetInFile`, but can change when multiple data stream files
+     * belonging to the same stream are merged.
+     */
+    bt2c::DataLen offsetInStream;
 
     /* Size of the packet. */
     bt2c::DataLen packetSize;
@@ -114,22 +117,25 @@ struct ctf_fs_ds_index_entry
 
 struct ctf_fs_ds_index
 {
-    std::vector<ctf_fs_ds_index_entry> entries;
+    using EntriesT = std::vector<ctf_fs_ds_index_entry>;
+
+    EntriesT entries;
+
+    void updateOffsetsInStream();
 };
 
 struct ctf_fs_ds_file_group
 {
     using UP = std::unique_ptr<ctf_fs_ds_file_group>;
 
-    explicit ctf_fs_ds_file_group(struct ctf_fs_trace * const trace,
-                                  ctf_stream_class * const scParam, const uint64_t streamInstanceId,
+    explicit ctf_fs_ds_file_group(struct ctf_fs_trace * const ctfFsTrace,
+                                  const ctf::src::DataStreamCls& dataStreamClsParam,
+                                  const uint64_t streamInstanceId,
                                   ctf_fs_ds_index indexParam) noexcept :
-
-        sc {scParam},
-        stream_id(streamInstanceId), ctf_fs_trace {trace},
+        dataStreamCls {&dataStreamClsParam},
+        stream_id {streamInstanceId}, ctf_fs_trace {ctfFsTrace},
         /* Don't use brace initialization, because of gcc 4.8. */
         index(std::move(indexParam))
-
     {
     }
 
@@ -148,8 +154,7 @@ struct ctf_fs_ds_file_group
      */
     std::vector<ctf_fs_ds_file_info::UP> ds_file_infos;
 
-    /* Owned by this */
-    struct ctf_stream_class *sc = nullptr;
+    const ctf::src::DataStreamCls *dataStreamCls;
 
     bt2::Stream::Shared stream;
 
@@ -162,45 +167,36 @@ struct ctf_fs_ds_file_group
     ctf_fs_ds_index index;
 };
 
-ctf_fs_ds_file::UP ctf_fs_ds_file_create(ctf_fs_trace *ctf_fs_trace, bt2::Stream::Shared stream,
-                                         const char *path, const bt2c::Logger& logger);
+ctf_fs_ds_file::UP ctf_fs_ds_file_create(const char *path, const bt2c::Logger& parentLogger);
 
-bt2s::optional<ctf_fs_ds_index> ctf_fs_ds_file_build_index(struct ctf_fs_ds_file *ds_file,
-                                                           struct ctf_fs_ds_file_info *ds_file_info,
-                                                           struct ctf_msg_iter *msg_iter);
+bt2s::optional<ctf_fs_ds_index> ctf_fs_ds_file_build_index(const ctf_fs_ds_file_info& file_info,
+                                                           const ctf::src::TraceCls& traceCls);
 
-ctf_fs_ds_file_info::UP ctf_fs_ds_file_info_create(const char *path, int64_t begin_ns);
+namespace ctf {
+namespace src {
+namespace fs {
 
-/*
- * Medium operations to iterate on a single ctf_fs_ds_file.
- *
- * The data pointer when using this must be a pointer to the ctf_fs_ds_file.
- */
-extern struct ctf_msg_iter_medium_ops ctf_fs_ds_file_medops;
+struct Medium : public ctf::src::Medium
+{
+    explicit Medium(const ctf_fs_ds_index& index, const bt2c::Logger& parentLogger);
 
-/*
- * Medium operations to iterate on the packet of a ctf_fs_ds_group.
- *
- * The iteration is done based on the index of the group.
- *
- * The data pointer when using these medops must be a pointer to a ctf_fs_ds
- * group_medops_data structure.
- */
-extern struct ctf_msg_iter_medium_ops ctf_fs_ds_group_medops;
+    ~Medium() = default;
+    Medium(const Medium&) = delete;
+    Medium& operator=(const Medium&) = delete;
 
-struct ctf_fs_ds_group_medops_data_deleter
-{
-    void operator()(struct ctf_fs_ds_group_medops_data *data) noexcept;
-};
+    ctf::src::Buf buf(bt2c::DataLen offset, bt2c::DataLen minSize) override;
 
-using ctf_fs_ds_group_medops_data_up =
-    std::unique_ptr<ctf_fs_ds_group_medops_data, ctf_fs_ds_group_medops_data_deleter>;
+private:
+    ctf_fs_ds_index::EntriesT::const_iterator
+    _mFindIndexEntryForOffset(bt2c::DataLen offsetInStream) const noexcept;
 
-enum ctf_msg_iter_medium_status
-ctf_fs_ds_group_medops_data_create(struct ctf_fs_ds_file_group *ds_file_group,
-                                   bt_self_message_iterator *self_msg_iter,
-                                   const bt2c::Logger& logger, ctf_fs_ds_group_medops_data_up& out);
+    const ctf_fs_ds_index& _mIndex;
+    bt2c::Logger _mLogger;
+    ctf_fs_ds_file::UP _mCurrentDsFile;
+};
 
-void ctf_fs_ds_group_medops_data_reset(struct ctf_fs_ds_group_medops_data *data);
+} /* namespace fs */
+} /* namespace src */
+} /* namespace ctf */
 
 #endif /* BABELTRACE_PLUGINS_CTF_FS_SRC_DATA_STREAM_FILE_HPP */
index b17d2ad54d96880c883b73c64f758bfca3500f09..e0b6468d5e326cd17a44e4a1f14d71646d915555 100644 (file)
 
 #include "common/assert.h"
 #include "common/common.h"
-#include "common/uuid.h"
+#include "cpp-common/bt2/message.hpp"
+#include "cpp-common/bt2/private-query-executor.hpp"
+#include "cpp-common/bt2/wrap.hpp"
+#include "cpp-common/bt2c/file-utils.hpp"
 #include "cpp-common/bt2c/glib-up.hpp"
 #include "cpp-common/bt2s/make-unique.hpp"
 
 #include "plugins/common/param-validation/param-validation.h"
 
+#include "../common/src/metadata/ctf-ir.hpp"
 #include "../common/src/metadata/tsdl/ctf-meta-configure-ir-trace.hpp"
+#include "../common/src/msg-iter.hpp"
 #include "../common/src/msg-iter/msg-iter.hpp"
+#include "../common/src/pkt-props.hpp"
 #include "data-stream-file.hpp"
 #include "file.hpp"
 #include "fs.hpp"
 #include "metadata.hpp"
 #include "query.hpp"
 
+using namespace bt2c::literals::datalen;
+using namespace ctf::src;
+using namespace ctf;
+
 struct tracer_info
 {
     const char *name;
@@ -37,80 +47,47 @@ struct tracer_info
     int64_t patch;
 };
 
-static bt_message_iterator_class_next_method_status
-ctf_fs_iterator_next_one(struct ctf_fs_msg_iter_data *msg_iter_data, const bt_message **out_msg)
-{
-    const auto msg_iter_status =
-        ctf_msg_iter_get_next_message(msg_iter_data->msg_iter.get(), out_msg);
-    bt_message_iterator_class_next_method_status status;
-
-    switch (msg_iter_status) {
-    case CTF_MSG_ITER_STATUS_OK:
-        /* Cool, message has been written to *out_msg. */
-        status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
-        break;
-
-    case CTF_MSG_ITER_STATUS_EOF:
-        status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
-        break;
-
-    case CTF_MSG_ITER_STATUS_AGAIN:
-        /*
-         * Should not make it this far as this is
-         * medium-specific; there is nothing for the user to do
-         * and it should have been handled upstream.
-         */
-        bt_common_abort();
-
-    case CTF_MSG_ITER_STATUS_ERROR:
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(msg_iter_data->logger,
-                                     "Failed to get next message from CTF message iterator.");
-        status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
-        break;
-
-    case CTF_MSG_ITER_STATUS_MEMORY_ERROR:
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(msg_iter_data->logger,
-                                     "Failed to get next message from CTF message iterator.");
-        status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
-        break;
-
-    default:
-        bt_common_abort();
-    }
-
-    return status;
-}
-
 bt_message_iterator_class_next_method_status
 ctf_fs_iterator_next(bt_self_message_iterator *iterator, bt_message_array_const msgs,
                      uint64_t capacity, uint64_t *count)
 {
-    try {
-        struct ctf_fs_msg_iter_data *msg_iter_data =
-            (struct ctf_fs_msg_iter_data *) bt_self_message_iterator_get_data(iterator);
+    struct ctf_fs_msg_iter_data *msg_iter_data =
+        (struct ctf_fs_msg_iter_data *) bt_self_message_iterator_get_data(iterator);
+    uint64_t i = 0;
 
-        if (G_UNLIKELY(msg_iter_data->next_saved_error)) {
-            /*
+    if (G_UNLIKELY(msg_iter_data->next_saved_error)) {
+        /*
          * Last time we were called, we hit an error but had some
          * messages to deliver, so we stashed the error here.  Return
          * it now.
          */
-            BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(msg_iter_data->next_saved_error);
-            return msg_iter_data->next_saved_status;
-        }
+        BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(msg_iter_data->next_saved_error);
+        return msg_iter_data->next_saved_status;
+    }
 
-        bt_message_iterator_class_next_method_status status;
-        uint64_t i = 0;
+    bt_message_iterator_class_next_method_status status =
+        BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
 
-        do {
-            status = ctf_fs_iterator_next_one(msg_iter_data, &msgs[i]);
-            if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
-                i++;
+    do {
+        try {
+            bt2::ConstMessage::Shared msg = msg_iter_data->msgIter->next();
+            if (G_LIKELY(msg)) {
+                msgs[i] = msg.release().libObjPtr();
+                ++i;
+            } else {
+                status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
             }
-        } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
+        } catch (const bt2::Error&) {
+            status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
+            break;
+        } catch (const std::bad_alloc&) {
+            status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
+            break;
+        }
+    } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
 
-        if (i > 0) {
-            /*
+    if (i > 0) {
+        /*
          * Even if ctf_fs_iterator_next_one() returned something
          * else than BT_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK, we
          * accumulated message objects in the output
@@ -121,29 +98,34 @@ ctf_fs_iterator_next(bt_self_message_iterator *iterator, bt_message_array_const
          * called, possibly without any accumulated
          * message, in which case we'll return it.
          */
-            if (status < 0) {
-                /*
+        if (status < 0) {
+            /*
              * Save this error for the next _next call.  Assume that
              * this component always appends error causes when
              * returning an error status code, which will cause the
              * current thread error to be non-NULL.
              */
-                msg_iter_data->next_saved_error = bt_current_thread_take_error();
-                BT_ASSERT(msg_iter_data->next_saved_error);
-                msg_iter_data->next_saved_status = status;
-            }
-
-            *count = i;
-            status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
+            msg_iter_data->next_saved_error = bt_current_thread_take_error();
+            BT_ASSERT(msg_iter_data->next_saved_error);
+            msg_iter_data->next_saved_status = status;
         }
 
-        return status;
-        return status;
-    } catch (const std::bad_alloc&) {
-        return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
-    } catch (const bt2::Error&) {
-        return BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
+        *count = i;
+        status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
     }
+
+    return status;
+}
+
+static void instantiateMsgIter(ctf_fs_msg_iter_data *msg_iter_data)
+{
+    ctf_fs_ds_file_group *ds_file_group = msg_iter_data->port_data->ds_file_group;
+
+    Medium::UP medium = bt2s::make_unique<fs::Medium>(ds_file_group->index, msg_iter_data->logger);
+    msg_iter_data->msgIter.emplace(
+        bt2::wrap(msg_iter_data->self_msg_iter), *ds_file_group->ctf_fs_trace->cls(),
+        ds_file_group->ctf_fs_trace->metadataStreamUuid(), *ds_file_group->stream,
+        std::move(medium), msg_iter_data->port_data->ctf_fs->quirks, msg_iter_data->logger);
 }
 
 bt_message_iterator_class_seek_beginning_method_status
@@ -155,8 +137,7 @@ ctf_fs_iterator_seek_beginning(bt_self_message_iterator *it)
 
         BT_ASSERT(msg_iter_data);
 
-        ctf_msg_iter_reset(msg_iter_data->msg_iter.get());
-        ctf_fs_ds_group_medops_data_reset(msg_iter_data->msg_iter_medops_data.get());
+        instantiateMsgIter(msg_iter_data);
 
         return BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK;
     } catch (const std::bad_alloc&) {
@@ -169,24 +150,7 @@ ctf_fs_iterator_seek_beginning(bt_self_message_iterator *it)
 void ctf_fs_iterator_finalize(bt_self_message_iterator *it)
 {
     ctf_fs_msg_iter_data::UP {
-        (static_cast<ctf_fs_msg_iter_data *>(bt_self_message_iterator_get_data(it)))};
-}
-
-static bt_message_iterator_class_initialize_method_status
-ctf_msg_iter_medium_status_to_msg_iter_initialize_status(enum ctf_msg_iter_medium_status status)
-{
-    switch (status) {
-    case CTF_MSG_ITER_MEDIUM_STATUS_EOF:
-    case CTF_MSG_ITER_MEDIUM_STATUS_AGAIN:
-    case CTF_MSG_ITER_MEDIUM_STATUS_ERROR:
-        return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
-    case CTF_MSG_ITER_MEDIUM_STATUS_MEMORY_ERROR:
-        return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-    case CTF_MSG_ITER_MEDIUM_STATUS_OK:
-        return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
-    }
-
-    bt_common_abort();
+        static_cast<ctf_fs_msg_iter_data *>(bt_self_message_iterator_get_data(it))};
 }
 
 bt_message_iterator_class_initialize_method_status
@@ -200,36 +164,15 @@ ctf_fs_iterator_init(bt_self_message_iterator *self_msg_iter,
         BT_ASSERT(port_data);
 
         auto msg_iter_data = bt2s::make_unique<ctf_fs_msg_iter_data>(self_msg_iter);
-        msg_iter_data->ds_file_group = port_data->ds_file_group;
-
-        ctf_msg_iter_medium_status medium_status = ctf_fs_ds_group_medops_data_create(
-            msg_iter_data->ds_file_group, self_msg_iter, msg_iter_data->logger,
-            msg_iter_data->msg_iter_medops_data);
-        BT_ASSERT(medium_status == CTF_MSG_ITER_MEDIUM_STATUS_OK ||
-                  medium_status == CTF_MSG_ITER_MEDIUM_STATUS_ERROR ||
-                  medium_status == CTF_MSG_ITER_MEDIUM_STATUS_MEMORY_ERROR);
-        if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(msg_iter_data->logger,
-                                         "Failed to create ctf_fs_ds_group_medops");
-            return ctf_msg_iter_medium_status_to_msg_iter_initialize_status(medium_status);
-        }
+        msg_iter_data->port_data = port_data;
 
-        msg_iter_data->msg_iter = ctf_msg_iter_create(
-            msg_iter_data->ds_file_group->ctf_fs_trace->metadata->tc,
-            bt_common_get_page_size(static_cast<int>(msg_iter_data->logger.level())) * 8,
-            ctf_fs_ds_group_medops, msg_iter_data->msg_iter_medops_data.get(), self_msg_iter,
-            msg_iter_data->logger);
-        if (!msg_iter_data->msg_iter) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(msg_iter_data->logger,
-                                         "Cannot create a CTF message iterator.");
-            return BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
-        }
+        instantiateMsgIter(msg_iter_data.get());
 
         /*
-     * This iterator can seek forward if its stream class has a default
-     * clock class.
-     */
-        if (msg_iter_data->ds_file_group->sc->default_clock_class) {
+         * This iterator can seek forward if its stream class has a default
+         * clock class.
+         */
+        if (msg_iter_data->port_data->ds_file_group->dataStreamCls->defClkCls()) {
             bt_self_message_iterator_configuration_set_can_seek_forward(config, true);
         }
 
@@ -262,12 +205,11 @@ std::string ctf_fs_make_port_name(ctf_fs_ds_file_group *ds_file_group)
      *   - the stream
      */
 
-    /* For the trace, use the uuid if present, else the path. */
-    if (ds_file_group->ctf_fs_trace->metadata->tc->is_uuid_set) {
-        char uuid_str[BT_UUID_STR_LEN + 1];
-
-        bt_uuid_to_str(ds_file_group->ctf_fs_trace->metadata->tc->uuid, uuid_str);
-        name << uuid_str;
+    /* For the trace, use the UID if present, else the path. */
+    /* ⚠️ TODO: also consider namespace and name? */
+    auto& uid = ds_file_group->ctf_fs_trace->cls()->uid();
+    if (uid) {
+        name << *uid;
     } else {
         name << ds_file_group->ctf_fs_trace->path;
     }
@@ -276,8 +218,8 @@ std::string ctf_fs_make_port_name(ctf_fs_ds_file_group *ds_file_group)
      * For the stream class, use the id if present.  We can omit this field
      * otherwise, as there will only be a single stream class.
      */
-    if (ds_file_group->sc->id != UINT64_C(-1)) {
-        name << " | " << ds_file_group->sc->id;
+    if (ds_file_group->dataStreamCls->id() != UINT64_C(-1)) {
+        name << " | " << ds_file_group->dataStreamCls->id();
     }
 
     /* For the stream, use the id if present, else, use the path. */
@@ -390,49 +332,29 @@ static void merge_ctf_fs_ds_indexes(ctf_fs_ds_index& dest, const ctf_fs_ds_index
 
 static int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, const char *path)
 {
-    /*
-     * Create a temporary ds_file to read some properties about the data
-     * stream file.
-     */
-    const auto ds_file =
-        ctf_fs_ds_file_create(ctf_fs_trace, bt2::Stream::Shared {}, path, ctf_fs_trace->logger);
-    if (!ds_file) {
-        return -1;
-    }
+    auto ds_file_info = bt2s::make_unique<ctf_fs_ds_file_info>(path, ctf_fs_trace->logger);
+    const auto& traceCls = *ctf_fs_trace->cls();
+    ctf_fs_ds_index tempIndex;
+    ctf_fs_ds_index_entry tempIndexEntry {path, 0_bytes, ds_file_info->size};
 
-    /* Create a temporary iterator to read the ds_file. */
-    ctf_msg_iter_up msg_iter = ctf_msg_iter_create(
-        ctf_fs_trace->metadata->tc,
-        bt_common_get_page_size(static_cast<int>(ctf_fs_trace->logger.level())) * 8,
-        ctf_fs_ds_file_medops, ds_file.get(), nullptr, ctf_fs_trace->logger);
-    if (!msg_iter) {
-        BT_CPPLOGE_SPEC(ctf_fs_trace->logger, "Cannot create a CTF message iterator.");
-        return -1;
-    }
+    tempIndex.entries.emplace_back(tempIndexEntry);
 
-    ctf_msg_iter_set_dry_run(msg_iter.get(), true);
-
-    ctf_msg_iter_packet_properties props;
-    int ret = ctf_msg_iter_get_packet_properties(msg_iter.get(), &props);
-    if (ret) {
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(
-            ctf_fs_trace->logger,
-            "Cannot get stream file's first packet's header and context fields (`{}`).", path);
-        return ret;
-    }
+    const auto props =
+        readPktProps(traceCls, bt2s::make_unique<fs::Medium>(tempIndex, ctf_fs_trace->logger),
+                     0_bytes, ctf_fs_trace->logger);
+    const auto sc = props.dataStreamCls;
 
-    ctf_stream_class *sc =
-        ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc, props.stream_class_id);
     BT_ASSERT(sc);
-    int64_t stream_instance_id = props.data_stream_id;
-    int64_t begin_ns = -1;
 
-    if (props.snapshots.beginning_clock != UINT64_C(-1)) {
-        BT_ASSERT(sc->default_clock_class);
-        ret = bt_util_clock_cycles_to_ns_from_origin(
-            props.snapshots.beginning_clock, sc->default_clock_class->frequency,
-            sc->default_clock_class->offset_seconds, sc->default_clock_class->offset_cycles,
-            &begin_ns);
+    bt2s::optional<unsigned long long> stream_instance_id = props.dataStreamId;
+
+    int64_t begin_ns = -1;
+    if (props.snapshots.beginDefClk) {
+        BT_ASSERT(sc->defClkCls());
+        int ret = bt_util_clock_cycles_to_ns_from_origin(
+            *props.snapshots.beginDefClk, sc->defClkCls()->freq(),
+            sc->defClkCls()->offsetFromOrigin().seconds(),
+            sc->defClkCls()->offsetFromOrigin().cycles(), &begin_ns);
         if (ret) {
             BT_CPPLOGE_APPEND_CAUSE_SPEC(
                 ctf_fs_trace->logger,
@@ -441,15 +363,10 @@ static int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, const
         }
     }
 
-    ctf_fs_ds_file_info::UP ds_file_info = ctf_fs_ds_file_info_create(path, begin_ns);
-    if (!ds_file_info) {
-        return -1;
-    }
-
-    auto index = ctf_fs_ds_file_build_index(ds_file.get(), ds_file_info.get(), msg_iter.get());
+    auto index = ctf_fs_ds_file_build_index(*ds_file_info, traceCls);
     if (!index) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs_trace->logger, "Failed to index CTF stream file \'{}\'",
-                                     ds_file->file->path);
+                                     path);
         return -1;
     }
 
@@ -459,10 +376,10 @@ static int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, const
          * within a stream file group, so consider that this
          * file must be the only one within its group.
          */
-        stream_instance_id = -1;
+        stream_instance_id.reset();
     }
 
-    if (stream_instance_id == -1) {
+    if (!stream_instance_id) {
         /*
          * No stream instance ID or no beginning timestamp:
          * create a unique stream file group for this stream
@@ -471,18 +388,17 @@ static int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, const
          * group.
          */
         ctf_fs_trace->ds_file_groups.emplace_back(bt2s::make_unique<ctf_fs_ds_file_group>(
-            ctf_fs_trace, sc, UINT64_C(-1), std::move(*index)));
+            ctf_fs_trace, *sc, UINT64_C(-1), std::move(*index)));
         ctf_fs_trace->ds_file_groups.back()->insert_ds_file_info_sorted(std::move(ds_file_info));
         return 0;
     }
 
-    BT_ASSERT(stream_instance_id != -1);
     BT_ASSERT(begin_ns != -1);
 
     /* Find an existing stream file group with this ID */
     ctf_fs_ds_file_group *ds_file_group = NULL;
     for (const auto& candidate : ctf_fs_trace->ds_file_groups) {
-        if (candidate->sc == sc && candidate->stream_id == stream_instance_id) {
+        if (candidate->dataStreamCls == sc && candidate->stream_id == stream_instance_id) {
             ds_file_group = candidate.get();
             break;
         }
@@ -490,7 +406,7 @@ static int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, const
 
     if (!ds_file_group) {
         ctf_fs_trace->ds_file_groups.emplace_back(bt2s::make_unique<ctf_fs_ds_file_group>(
-            ctf_fs_trace, sc, static_cast<std::uint64_t>(stream_instance_id), std::move(*index)));
+            ctf_fs_trace, *sc, static_cast<std::uint64_t>(*stream_instance_id), std::move(*index)));
         ds_file_group = ctf_fs_trace->ds_file_groups.back().get();
     } else {
         merge_ctf_fs_ds_indexes(ds_file_group->index, *index);
@@ -597,31 +513,24 @@ static ctf_fs_trace::UP ctf_fs_trace_create(const char *path, const char *name,
                                             bt_self_component *selfComp,
                                             const bt2c::Logger& parentLogger)
 {
-    ctf_fs_trace::UP ctf_fs_trace = bt2s::make_unique<struct ctf_fs_trace>(parentLogger);
-    ctf_fs_trace->path = path;
-    ctf_fs_trace->metadata = bt2s::make_unique<ctf_fs_metadata>();
-
-    int ret = ctf_fs_metadata_set_trace_class(selfComp, ctf_fs_trace.get(), clkClsCfg);
-    if (ret) {
-        return nullptr;
-    }
-
-    if (ctf_fs_trace->metadata->trace_class) {
-        bt_trace *trace = bt_trace_create(ctf_fs_trace->metadata->trace_class->libObjPtr());
-        if (!trace) {
-            return nullptr;
-        }
+    auto ctf_fs_trace = bt2s::make_unique<struct ctf_fs_trace>(clkClsCfg, selfComp, parentLogger);
+    const auto metadataPath = fmt::format("{}" G_DIR_SEPARATOR_S CTF_FS_METADATA_FILENAME, path);
 
-        ctf_fs_trace->trace = bt2::Trace::Shared::createWithoutRef(trace);
-    }
+    ctf_fs_trace->path = path;
+    ctf_fs_trace->parseMetadata(bt2c::dataFromFile(metadataPath, parentLogger, true));
 
-    if (ctf_fs_trace->trace) {
-        ctf_trace_class_configure_ir_trace(ctf_fs_trace->metadata->tc, *ctf_fs_trace->trace);
+    BT_ASSERT(ctf_fs_trace->cls());
 
+    if (ctf_fs_trace->cls()->libCls()) {
+        bt2::TraceClass traceCls = *ctf_fs_trace->cls()->libCls();
+        ctf_fs_trace->trace = traceCls.instantiate();
+        ctf_trace_class_configure_ir_trace(*ctf_fs_trace->cls(), *ctf_fs_trace->trace,
+                                           bt_self_component_get_graph_mip_version(selfComp),
+                                           ctf_fs_trace->logger);
         set_trace_name(*ctf_fs_trace->trace, name);
     }
 
-    ret = create_ds_file_groups(ctf_fs_trace.get());
+    int ret = create_ds_file_groups(ctf_fs_trace.get());
     if (ret) {
         return nullptr;
     }
@@ -691,12 +600,11 @@ static int ctf_fs_component_create_ctf_fs_trace_one_path(struct ctf_fs_component
 
 static unsigned int metadata_count_stream_and_event_classes(struct ctf_fs_trace *trace)
 {
-    unsigned int num = trace->metadata->tc->stream_classes->len;
+    const TraceCls::DataStreamClsSet& dataStreamClasses = trace->cls()->dataStreamClasses();
+    unsigned int num = dataStreamClasses.size();
 
-    for (guint i = 0; i < trace->metadata->tc->stream_classes->len; i++) {
-        struct ctf_stream_class *sc =
-            (struct ctf_stream_class *) trace->metadata->tc->stream_classes->pdata[i];
-        num += sc->event_classes->len;
+    for (const DataStreamCls::UP& dsc : dataStreamClasses) {
+        num += dsc->eventRecordClasses().size();
     }
 
     return num;
@@ -753,7 +661,7 @@ static int merge_matching_ctf_fs_ds_file_groups(struct ctf_fs_trace *dest_trace,
                  * stream instance.
                  */
                 if (candidate_dest->stream_id != src_group->stream_id ||
-                    candidate_dest->sc->id != src_group->sc->id) {
+                    candidate_dest->dataStreamCls->id() != src_group->dataStreamCls->id()) {
                     continue;
                 }
 
@@ -769,12 +677,11 @@ static int merge_matching_ctf_fs_ds_file_groups(struct ctf_fs_trace *dest_trace,
          * trace chunk.
          */
         if (!dest_group) {
-            ctf_stream_class *sc = ctf_trace_class_borrow_stream_class_by_id(
-                dest_trace->metadata->tc, src_group->sc->id);
+            const DataStreamCls *sc = (*dest_trace->cls())[src_group->dataStreamCls->id()];
             BT_ASSERT(sc);
 
             dest_trace->ds_file_groups.emplace_back(bt2s::make_unique<ctf_fs_ds_file_group>(
-                dest_trace, sc, src_group->stream_id, ctf_fs_ds_index {}));
+                dest_trace, *sc, src_group->stream_id, ctf_fs_ds_index {}));
             dest_group = dest_trace->ds_file_groups.back().get();
         }
 
@@ -808,7 +715,8 @@ static int merge_ctf_fs_traces(std::vector<ctf_fs_trace::UP> traces, ctf_fs_trac
         unsigned int candidate_count;
 
         /* A bit of sanity check. */
-        BT_ASSERT(bt_uuid_compare(winner->metadata->tc->uuid, candidate->metadata->tc->uuid) == 0);
+        /* ⚠️ TODO: also consider namespace and name */
+        BT_ASSERT(winner->cls()->uid() == candidate->cls()->uid());
 
         candidate_count = metadata_count_stream_and_event_classes(candidate);
 
@@ -841,103 +749,129 @@ static int merge_ctf_fs_traces(std::vector<ctf_fs_trace::UP> traces, ctf_fs_trac
     return 0;
 }
 
-enum target_event
+struct ClockSnapshotAfterEventItemVisitor : public ItemVisitor
+{
+    bool done() const
+    {
+        return _mDone;
+    }
+
+    bt2s::optional<unsigned long long> result() const
+    {
+        return _mResult;
+    }
+
+protected:
+    bt2s::optional<unsigned long long> _mResult;
+    bool _mDone = false;
+};
+
+struct ClockSnapshotAfterFirstEventItemVisitor : public ClockSnapshotAfterEventItemVisitor
+{
+    void visit(const EventRecordInfoItem& item) override
+    {
+        _mResult = item.defClkVal();
+        _mDone = true;
+    }
+};
+
+/*
+ * Find the timestamp of the last event of the packet, if any, otherwise
+ * find the timestamp of the beginning of the packet.
+ */
+struct ClockSnapshotAfterLastEventItemVisitor : public ClockSnapshotAfterEventItemVisitor
 {
-    FIRST_EVENT,
-    LAST_EVENT,
+    void visit(const PktInfoItem& item) override
+    {
+        _mLastSeen = item.beginDefClkVal();
+    }
+
+    void visit(const EventRecordInfoItem& item) override
+    {
+        _mLastSeen = item.defClkVal();
+    }
+
+    void visit(const PktEndItem&) override
+    {
+        _mResult = _mLastSeen;
+        _mDone = true;
+    }
+
+private:
+    bt2s::optional<unsigned long long> _mLastSeen;
 };
 
 static int decode_clock_snapshot_after_event(struct ctf_fs_trace *ctf_fs_trace,
-                                             struct ctf_clock_class *default_cc,
+                                             const ClkCls& default_cc,
                                              const ctf_fs_ds_index_entry& index_entry,
-                                             enum target_event target_event, uint64_t *cs,
-                                             int64_t *ts_ns)
+                                             ClockSnapshotAfterEventItemVisitor& visitor,
+                                             const char *firstOrLast, uint64_t *cs, int64_t *ts_ns)
 {
     BT_ASSERT(ctf_fs_trace);
+    BT_ASSERT(ctf_fs_trace->cls());
     BT_ASSERT(index_entry.path);
 
-    const auto ds_file = ctf_fs_ds_file_create(ctf_fs_trace, bt2::Stream::Shared {},
-                                               index_entry.path, ctf_fs_trace->logger);
-    if (!ds_file) {
-        BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs_trace->logger, "Failed to create a ctf_fs_ds_file");
-        return -1;
-    }
+    ctf_fs_ds_index tempIndex;
 
-    BT_ASSERT(ctf_fs_trace->metadata);
-    BT_ASSERT(ctf_fs_trace->metadata->tc);
+    tempIndex.entries.emplace_back(index_entry);
 
-    ctf_msg_iter_up msg_iter = ctf_msg_iter_create(
-        ctf_fs_trace->metadata->tc,
-        bt_common_get_page_size(static_cast<int>(ctf_fs_trace->logger.level())) * 8,
+    ItemSeqIter itemSeqIter {bt2s::make_unique<fs::Medium>(tempIndex, ctf_fs_trace->logger),
+                             *ctf_fs_trace->cls(), index_entry.offsetInFile, ctf_fs_trace->logger};
 
-        ctf_fs_ds_file_medops, ds_file.get(), NULL, ctf_fs_trace->logger);
-    if (!msg_iter) {
-        /* ctf_msg_iter_create() logs errors. */
-        return -1;
-    }
+    LoggingItemVisitor loggingVisitor(ctf_fs_trace->logger);
 
-    /*
-     * Turn on dry run mode to prevent the creation and usage of Babeltrace
-     * library objects (bt_field, bt_message_*, etc.).
-     */
-    ctf_msg_iter_set_dry_run(msg_iter.get(), true);
+    while (!visitor.done()) {
+        const Item *item = itemSeqIter.next();
+        BT_ASSERT(item);
 
-    /* Seek to the beginning of the target packet. */
-    enum ctf_msg_iter_status iter_status =
-        ctf_msg_iter_seek(msg_iter.get(), index_entry.offset.bytes());
-    if (iter_status) {
-        /* ctf_msg_iter_seek() logs errors. */
-        return -1;
+        if (ctf_fs_trace->logger.wouldLogT()) {
+            item->accept(loggingVisitor);
+        }
+
+        item->accept(visitor);
     }
 
-    switch (target_event) {
-    case FIRST_EVENT:
-        /*
-         * Start to decode the packet until we reach the end of
-         * the first event. To extract the first event's clock
-         * snapshot.
-         */
-        iter_status = ctf_msg_iter_curr_packet_first_event_clock_snapshot(msg_iter.get(), cs);
-        break;
-    case LAST_EVENT:
-        /* Decode the packet to extract the last event's clock snapshot. */
-        iter_status = ctf_msg_iter_curr_packet_last_event_clock_snapshot(msg_iter.get(), cs);
-        break;
-    default:
-        bt_common_abort();
-    }
-    if (iter_status) {
+    if (!visitor.result()) {
+        BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs_trace->logger, "Failed to get {} event clock snapshot.",
+                                     firstOrLast);
         return -1;
     }
 
+    *cs = *visitor.result();
+
     /* Convert clock snapshot to timestamp. */
-    int ret = bt_util_clock_cycles_to_ns_from_origin(
-        *cs, default_cc->frequency, default_cc->offset_seconds, default_cc->offset_cycles, ts_ns);
+    int ret = bt_util_clock_cycles_to_ns_from_origin(*cs, default_cc.freq(),
+                                                     default_cc.offsetFromOrigin().seconds(),
+                                                     default_cc.offsetFromOrigin().cycles(), ts_ns);
     if (ret) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs_trace->logger,
                                      "Failed to convert clock snapshot to timestamp");
         return ret;
     }
 
-    return 0;
+    return ret;
 }
 
 static int decode_packet_first_event_timestamp(struct ctf_fs_trace *ctf_fs_trace,
-                                               struct ctf_clock_class *default_cc,
+                                               const ClkCls& default_cc,
                                                const ctf_fs_ds_index_entry& index_entry,
                                                uint64_t *cs, int64_t *ts_ns)
 {
-    return decode_clock_snapshot_after_event(ctf_fs_trace, default_cc, index_entry, FIRST_EVENT, cs,
-                                             ts_ns);
+    ClockSnapshotAfterFirstEventItemVisitor visitor {};
+
+    return decode_clock_snapshot_after_event(ctf_fs_trace, default_cc, index_entry, visitor,
+                                             "first", cs, ts_ns);
 }
 
 static int decode_packet_last_event_timestamp(struct ctf_fs_trace *ctf_fs_trace,
-                                              struct ctf_clock_class *default_cc,
+                                              const ClkCls& default_cc,
                                               const ctf_fs_ds_index_entry& index_entry,
                                               uint64_t *cs, int64_t *ts_ns)
 {
-    return decode_clock_snapshot_after_event(ctf_fs_trace, default_cc, index_entry, LAST_EVENT, cs,
-                                             ts_ns);
+    ClockSnapshotAfterLastEventItemVisitor visitor {};
+
+    return decode_clock_snapshot_after_event(ctf_fs_trace, default_cc, index_entry, visitor, "last",
+                                             cs, ts_ns);
 }
 
 /*
@@ -988,8 +922,8 @@ static int fix_index_lttng_event_after_packet_bug(struct ctf_fs_trace *trace)
          */
         auto& last_entry = index.entries.back();
 
-        BT_ASSERT(ds_file_group->sc->default_clock_class);
-        ctf_clock_class *default_cc = ds_file_group->sc->default_clock_class;
+        BT_ASSERT(ds_file_group->dataStreamCls->defClkCls());
+        const ClkCls& default_cc = *ds_file_group->dataStreamCls->defClkCls();
 
         /*
          * Decode packet to read the timestamp of the last event of the
@@ -1029,8 +963,8 @@ static int fix_index_barectf_event_before_packet_bug(struct ctf_fs_trace *trace)
 
         BT_ASSERT(!index.entries.empty());
 
-        BT_ASSERT(ds_file_group->sc->default_clock_class);
-        ctf_clock_class *default_cc = ds_file_group->sc->default_clock_class;
+        BT_ASSERT(ds_file_group->dataStreamCls->defClkCls());
+        const ClkCls& default_cc = *ds_file_group->dataStreamCls->defClkCls();
 
         /*
          * 1. Iterate over the index, starting from the second entry
@@ -1084,13 +1018,11 @@ static int fix_index_barectf_event_before_packet_bug(struct ctf_fs_trace *trace)
 static int fix_index_lttng_crash_quirk(struct ctf_fs_trace *trace)
 {
     for (const auto& ds_file_group : trace->ds_file_groups) {
-        struct ctf_clock_class *default_cc;
-
         BT_ASSERT(ds_file_group);
         auto& index = ds_file_group->index;
 
-        BT_ASSERT(ds_file_group->sc->default_clock_class);
-        default_cc = ds_file_group->sc->default_clock_class;
+        BT_ASSERT(ds_file_group->dataStreamCls->defClkCls());
+        const ClkCls& default_cc = *ds_file_group->dataStreamCls->defClkCls();
 
         BT_ASSERT(!index.entries.empty());
 
@@ -1138,6 +1070,12 @@ static int fix_index_lttng_crash_quirk(struct ctf_fs_trace *trace)
  */
 static int extract_tracer_info(struct ctf_fs_trace *trace, struct tracer_info *current_tracer_info)
 {
+    if (!trace->cls()->env()) {
+        return -1;
+    }
+
+    bt2::ConstMapValue env = *trace->cls()->env();
+
     /* Clear the current_tracer_info struct */
     memset(current_tracer_info, 0, sizeof(*current_tracer_info));
 
@@ -1146,47 +1084,54 @@ static int extract_tracer_info(struct ctf_fs_trace *trace, struct tracer_info *c
      * major version are needed. If one of these is missing, consider it an
      * extraction failure.
      */
-    ctf_trace_class_env_entry *entry =
-        ctf_trace_class_borrow_env_entry_by_name(trace->metadata->tc, "tracer_name");
-    if (!entry || entry->type != CTF_TRACE_CLASS_ENV_ENTRY_TYPE_STR) {
+    bt2::OptionalBorrowedObject<bt2::ConstValue> tracerName = env["tracer_name"];
+    if (!tracerName || !tracerName->isString()) {
         return -1;
     }
 
     /* Set tracer name. */
-    current_tracer_info->name = entry->value.str->str;
+    current_tracer_info->name = tracerName->asString().value();
 
-    entry = ctf_trace_class_borrow_env_entry_by_name(trace->metadata->tc, "tracer_major");
-    if (!entry || entry->type != CTF_TRACE_CLASS_ENV_ENTRY_TYPE_INT) {
+    bt2::OptionalBorrowedObject<bt2::ConstValue> tracerMajor = env["tracer_major"];
+    if (!tracerMajor || !tracerMajor->isInteger()) {
         return -1;
     }
 
     /* Set major version number. */
-    current_tracer_info->major = entry->value.i;
+    current_tracer_info->major =
+        tracerMajor->isSignedInteger() ?
+            tracerMajor->asSignedInteger().value() :
+            static_cast<std::int64_t>(tracerMajor->asUnsignedInteger().value());
 
-    entry = ctf_trace_class_borrow_env_entry_by_name(trace->metadata->tc, "tracer_minor");
-    if (!entry || entry->type != CTF_TRACE_CLASS_ENV_ENTRY_TYPE_INT) {
+    bt2::OptionalBorrowedObject<bt2::ConstValue> tracerMinor = env["tracer_minor"];
+    if (!tracerMinor || !tracerMinor->isInteger()) {
         return 0;
     }
 
     /* Set minor version number. */
-    current_tracer_info->minor = entry->value.i;
+    current_tracer_info->minor =
+        tracerMinor->isSignedInteger() ?
+            tracerMinor->asSignedInteger().value() :
+            static_cast<std::int64_t>(tracerMinor->asUnsignedInteger().value());
 
-    entry = ctf_trace_class_borrow_env_entry_by_name(trace->metadata->tc, "tracer_patch");
-    if (!entry) {
-        /*
-         * If `tracer_patch` doesn't exist `tracer_patchlevel` might.
-         * For example, `lttng-modules` uses entry name
-         * `tracer_patchlevel`.
-         */
-        entry = ctf_trace_class_borrow_env_entry_by_name(trace->metadata->tc, "tracer_patchlevel");
+    /*
+     * If `tracer_patch` doesn't exist `tracer_patchlevel` might.
+     * For example, `lttng-modules` uses entry name `tracer_patchlevel`.
+     */
+    bt2::OptionalBorrowedObject<bt2::ConstValue> tracerPatch = env["tracer_patch"];
+    if (!tracerPatch) {
+        tracerPatch = env["tracer_patchlevel"];
     }
 
-    if (!entry || entry->type != CTF_TRACE_CLASS_ENV_ENTRY_TYPE_INT) {
+    if (!tracerPatch || !tracerPatch->isInteger()) {
         return 0;
     }
 
     /* Set patch version number. */
-    current_tracer_info->patch = entry->value.i;
+    current_tracer_info->patch =
+        tracerPatch->isSignedInteger() ?
+            tracerPatch->asSignedInteger().value() :
+            static_cast<std::int64_t>(tracerPatch->asUnsignedInteger().value());
 
     return 0;
 }
@@ -1269,11 +1214,11 @@ static bool is_tracer_affected_by_lttng_crash_quirk(struct tracer_info *curr_tra
  * Looks for trace produced by known buggy tracers and fix up the index
  * produced earlier.
  */
-static int fix_packet_index_tracer_bugs(ctf_fs_trace *trace)
+static int fix_packet_index_tracer_bugs(ctf_fs_component *ctf_fs)
 {
     struct tracer_info current_tracer_info;
 
-    int ret = extract_tracer_info(trace, &current_tracer_info);
+    int ret = extract_tracer_info(ctf_fs->trace.get(), &current_tracer_info);
     if (ret) {
         /*
          * A trace may not have all the necessary environment
@@ -1283,44 +1228,44 @@ static int fix_packet_index_tracer_bugs(ctf_fs_trace *trace)
          * an error.
          */
         BT_CPPLOGI_SPEC(
-            trace->logger,
+            ctf_fs->logger,
             "Cannot extract tracer information necessary to compare with buggy versions.");
         return 0;
     }
 
     /* Check if the trace may be affected by old tracer bugs. */
     if (is_tracer_affected_by_lttng_event_after_packet_bug(&current_tracer_info)) {
-        BT_CPPLOGI_SPEC(trace->logger,
+        BT_CPPLOGI_SPEC(ctf_fs->logger,
                         "Trace may be affected by LTTng tracer packet timestamp bug. Fixing up.");
-        ret = fix_index_lttng_event_after_packet_bug(trace);
+        ret = fix_index_lttng_event_after_packet_bug(ctf_fs->trace.get());
         if (ret) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(trace->logger,
+            BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs->logger,
                                          "Failed to fix LTTng event-after-packet bug.");
             return ret;
         }
-        trace->metadata->tc->quirks.lttng_event_after_packet = true;
+        ctf_fs->quirks.eventRecordDefClkValGtNextPktBeginDefClkVal = true;
     }
 
     if (is_tracer_affected_by_barectf_event_before_packet_bug(&current_tracer_info)) {
-        BT_CPPLOGI_SPEC(trace->logger,
+        BT_CPPLOGI_SPEC(ctf_fs->logger,
                         "Trace may be affected by barectf tracer packet timestamp bug. Fixing up.");
-        ret = fix_index_barectf_event_before_packet_bug(trace);
+        ret = fix_index_barectf_event_before_packet_bug(ctf_fs->trace.get());
         if (ret) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(trace->logger,
+            BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs->logger,
                                          "Failed to fix barectf event-before-packet bug.");
             return ret;
         }
-        trace->metadata->tc->quirks.barectf_event_before_packet = true;
+        ctf_fs->quirks.eventRecordDefClkValLtPktBeginDefClkVal = true;
     }
 
     if (is_tracer_affected_by_lttng_crash_quirk(&current_tracer_info)) {
-        ret = fix_index_lttng_crash_quirk(trace);
+        ret = fix_index_lttng_crash_quirk(ctf_fs->trace.get());
         if (ret) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(trace->logger,
+            BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs->logger,
                                          "Failed to fix lttng-crash timestamp quirks.");
             return ret;
         }
-        trace->metadata->tc->quirks.lttng_crash = true;
+        ctf_fs->quirks.pktEndDefClkValZero = true;
     }
 
     return 0;
@@ -1369,37 +1314,31 @@ int ctf_fs_component_create_ctf_fs_trace(struct ctf_fs_component *ctf_fs,
 
     if (traces.size() > 1) {
         ctf_fs_trace *first_trace = traces[0].get();
-        const uint8_t *first_trace_uuid = first_trace->metadata->tc->uuid;
 
         /*
          * We have more than one trace, they must all share the same
-         * UUID, verify that.
+         * UID, verify that.
          */
-        for (size_t i = 0; i < traces.size(); i++) {
-            ctf_fs_trace *this_trace = traces[i].get();
-            const uint8_t *this_trace_uuid = this_trace->metadata->tc->uuid;
-
-            if (!this_trace->metadata->tc->is_uuid_set) {
+        /* ⚠️ TODO: also consider namespace and name */
+        for (const ctf_fs_trace::UP& this_trace : traces) {
+            if (!this_trace->cls()->uid()) {
                 BT_CPPLOGE_APPEND_CAUSE_SPEC(
                     ctf_fs->logger,
-                    "Multiple traces given, but a trace does not have a UUID: path={}",
+                    "Multiple traces given, but a trace does not have a UID: path={}",
                     this_trace->path);
                 return -1;
             }
 
-            if (bt_uuid_compare(first_trace_uuid, this_trace_uuid) != 0) {
-                char first_trace_uuid_str[BT_UUID_STR_LEN + 1];
-                char this_trace_uuid_str[BT_UUID_STR_LEN + 1];
-
-                bt_uuid_to_str(first_trace_uuid, first_trace_uuid_str);
-                bt_uuid_to_str(this_trace_uuid, this_trace_uuid_str);
+            auto& first_trace_uid = *first_trace->cls()->uid();
+            auto& this_trace_uid = *this_trace->cls()->uid();
 
+            if (first_trace_uid != this_trace_uid) {
                 BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs->logger,
-                                             "Multiple traces given, but UUIDs don't match: "
-                                             "first-trace-uuid={}, first-trace-path={}, "
-                                             "trace-uuid={}, trace-path={}",
-                                             first_trace_uuid_str, first_trace->path,
-                                             this_trace_uuid_str, this_trace->path);
+                                             "Multiple traces given, but UIDs don't match: "
+                                             "first-trace-uid={}, first-trace-path={}, "
+                                             "trace-uid={}, trace-path={}",
+                                             first_trace_uid, first_trace->path, this_trace_uid,
+                                             this_trace->path);
                 return -1;
             }
         }
@@ -1418,7 +1357,7 @@ int ctf_fs_component_create_ctf_fs_trace(struct ctf_fs_component *ctf_fs,
         BT_DIAG_POP
     }
 
-    int ret = fix_packet_index_tracer_bugs(ctf_fs->trace.get());
+    int ret = fix_packet_index_tracer_bugs(ctf_fs);
     if (ret) {
         BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs->logger, "Failed to fix packet index tracer bugs.");
         return ret;
@@ -1439,6 +1378,14 @@ int ctf_fs_component_create_ctf_fs_trace(struct ctf_fs_component *ctf_fs,
     std::sort(ctf_fs->trace->ds_file_groups.begin(), ctf_fs->trace->ds_file_groups.end(),
               compare_ds_file_groups_by_first_path);
 
+    /*
+     * Now that indexes are not going to change anymore, compute each entry's
+     * offset in the logical data stream.
+     */
+    for (ctf_fs_ds_file_group::UP& group : ctf_fs->trace->ds_file_groups) {
+        group->index.updateOffsetsInStream();
+    }
+
     return 0;
 }
 
@@ -1455,37 +1402,29 @@ get_stream_instance_unique_name(struct ctf_fs_ds_file_group *ds_file_group)
 
 /* Create the IR stream objects for ctf_fs_trace. */
 
-static int create_streams_for_trace(struct ctf_fs_trace *ctf_fs_trace)
+static void create_streams_for_trace(struct ctf_fs_trace *ctf_fs_trace)
 {
-    for (const auto& ds_file_group : ctf_fs_trace->ds_file_groups) {
-        const std::string& name = get_stream_instance_unique_name(ds_file_group.get());
+    BT_ASSERT(ctf_fs_trace->trace);
 
-        BT_ASSERT(ds_file_group->sc->ir_sc);
-        BT_ASSERT(ctf_fs_trace->trace);
+    for (const auto& ds_file_group : ctf_fs_trace->ds_file_groups) {
+        BT_ASSERT(ds_file_group->dataStreamCls->libCls());
 
-        const bt2::StreamClass sc {ds_file_group->sc->ir_sc};
+        const std::string& name = get_stream_instance_unique_name(ds_file_group.get());
+        const auto streamCls = *ds_file_group->dataStreamCls->libCls();
 
         if (ds_file_group->stream_id == UINT64_C(-1)) {
             /* No stream ID: use 0 */
             ds_file_group->stream =
-                sc.instantiate(*ctf_fs_trace->trace, ctf_fs_trace->next_stream_id);
+                streamCls.instantiate(*ctf_fs_trace->trace, ctf_fs_trace->next_stream_id);
             ctf_fs_trace->next_stream_id++;
         } else {
             /* Specific stream ID */
-            ds_file_group->stream = sc.instantiate(*ctf_fs_trace->trace, ds_file_group->stream_id);
+            ds_file_group->stream =
+                streamCls.instantiate(*ctf_fs_trace->trace, ds_file_group->stream_id);
         }
 
-        int ret = bt_stream_set_name(ds_file_group->stream->libObjPtr(), name.c_str());
-        if (ret) {
-            BT_CPPLOGE_APPEND_CAUSE_SPEC(ctf_fs_trace->logger,
-                                         "Cannot set stream's name: "
-                                         "addr={}, stream-name=\"{}\"",
-                                         fmt::ptr(ds_file_group->stream->libObjPtr()), name);
-            return ret;
-        }
+        ds_file_group->stream->name(name);
     }
-
-    return 0;
 }
 
 static const bt_param_validation_value_descr inputs_elem_descr =
@@ -1557,9 +1496,7 @@ static ctf_fs_component::UP ctf_fs_create(const bt2::ConstMapValue params,
         return nullptr;
     }
 
-    if (create_streams_for_trace(ctf_fs->trace.get())) {
-        return nullptr;
-    }
+    create_streams_for_trace(ctf_fs->trace.get());
 
     if (create_ports_for_trace(ctf_fs.get(), ctf_fs->trace.get(), self_comp_src)) {
         return nullptr;
index 4f7837d693d1c1887224a1828a83c28391d4b624..f76b5eabe25655c135c601dfa3b0d904ecbdba67 100644 (file)
 
 #include <babeltrace2/babeltrace.h>
 
+#include "cpp-common/bt2c/aliases.hpp"
 #include "cpp-common/bt2c/logging.hpp"
 
 #include "data-stream-file.hpp"
-#include "plugins/ctf/common/src/metadata/tsdl/decoder.hpp"
+#include "plugins/ctf/common/src/metadata/metadata-stream-parser-utils.hpp"
+#include "plugins/ctf/common/src/msg-iter.hpp"
+
+#define CTF_FS_METADATA_FILENAME "metadata"
 
 extern bool ctf_fs_debug;
 
-struct ctf_fs_metadata
+struct ctf_fs_trace
 {
-    using UP = std::unique_ptr<ctf_fs_metadata>;
-
-    /* Owned by this */
-    ctf_metadata_decoder_up decoder;
-
-    bt2::TraceClass::Shared trace_class;
+    using UP = std::unique_ptr<ctf_fs_trace>;
 
-    /* Weak (owned by `decoder` above) */
-    struct ctf_trace_class *tc = nullptr;
+    explicit ctf_fs_trace(const ctf::src::ClkClsCfg& clkClsCfg,
+                          const bt2::OptionalBorrowedObject<bt2::SelfComponent> selfComp,
+                          const bt2c::Logger& parentLogger) :
+        logger {parentLogger, "PLUGIN/SRC.CTF.FS/TRACE"},
+        _mClkClsCfg {clkClsCfg}, _mSelfComp {selfComp}
+    {
+    }
 
-    int bo = 0;
-};
+    const ctf::src::TraceCls *cls() const
+    {
+        BT_ASSERT(_mParseRet);
+        BT_ASSERT(_mParseRet->traceCls);
+        return _mParseRet->traceCls.get();
+    }
 
-struct ctf_fs_trace
-{
-    using UP = std::unique_ptr<ctf_fs_trace>;
+    const bt2s::optional<bt2c::Uuid>& metadataStreamUuid() const noexcept
+    {
+        BT_ASSERT(_mParseRet);
+        return _mParseRet->uuid;
+    }
 
-    explicit ctf_fs_trace(const bt2c::Logger& parentLogger) :
-        logger {parentLogger, "PLUGIN/SRC.CTF.FS/TRACE"}
+    void parseMetadata(const bt2c::ConstBytes buffer)
     {
+        _mParseRet = ctf::src::parseMetadataStream(_mSelfComp, _mClkClsCfg, buffer, this->logger);
     }
 
     bt2c::Logger logger;
 
-    ctf_fs_metadata::UP metadata;
-
     bt2::Trace::Shared trace;
 
     std::vector<ctf_fs_ds_file_group::UP> ds_file_groups;
@@ -57,6 +65,11 @@ struct ctf_fs_trace
 
     /* Next automatic stream ID when not provided by packet header */
     uint64_t next_stream_id = 0;
+
+private:
+    ctf::src::ClkClsCfg _mClkClsCfg;
+    bt2::OptionalBorrowedObject<bt2::SelfComponent> _mSelfComp;
+    bt2s::optional<ctf::src::MetadataStreamParser::ParseRet> _mParseRet;
 };
 
 struct ctf_fs_port_data
@@ -88,6 +101,7 @@ struct ctf_fs_component
     ctf_fs_trace::UP trace;
 
     ctf::src::ClkClsCfg clkClsCfg;
+    ctf::src::MsgIterQuirks quirks;
 };
 
 struct ctf_fs_msg_iter_data
@@ -105,10 +119,10 @@ struct ctf_fs_msg_iter_data
 
     bt2c::Logger logger;
 
-    /* Weak, belongs to ctf_fs_trace */
-    struct ctf_fs_ds_file_group *ds_file_group = nullptr;
+    /* Weak, belongs to ctf_fs_component */
+    ctf_fs_port_data *port_data = nullptr;
 
-    ctf_msg_iter_up msg_iter;
+    bt2s::optional<ctf::src::MsgIter> msgIter;
 
     /*
      * Saved error.  If we hit an error in the _next method, but have some
@@ -118,8 +132,6 @@ struct ctf_fs_msg_iter_data
     bt_message_iterator_class_next_method_status next_saved_status =
         BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
     const struct bt_error *next_saved_error = nullptr;
-
-    ctf_fs_ds_group_medops_data_up msg_iter_medops_data;
 };
 
 bt_component_class_initialize_method_status
diff --git a/src/plugins/ctf/fs-src/metadata.cpp b/src/plugins/ctf/fs-src/metadata.cpp
deleted file mode 100644 (file)
index bf905d3..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * SPDX-License-Identifier: MIT
- *
- * Copyright 2016 Philippe Proulx <pproulx@efficios.com>
- * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
- */
-
-#include "common/assert.h"
-#include "cpp-common/bt2s/make-unique.hpp"
-
-#include "../common/src/metadata/tsdl/decoder.hpp"
-#include "file.hpp"
-#include "fs.hpp"
-#include "metadata.hpp"
-
-FILE *ctf_fs_metadata_open_file(const char *trace_path, const bt2c::Logger& logger)
-{
-    GString *metadata_path;
-    FILE *fp = NULL;
-
-    metadata_path = g_string_new(trace_path);
-    if (!metadata_path) {
-        goto end;
-    }
-
-    g_string_append(metadata_path, G_DIR_SEPARATOR_S CTF_FS_METADATA_FILENAME);
-    fp = fopen(metadata_path->str, "rb");
-    if (!fp) {
-        BT_CPPLOGE_ERRNO_APPEND_CAUSE_SPEC(logger, "Failed to open metadata file", ": path=\"{}\"",
-                                           metadata_path->str);
-    }
-
-    g_string_free(metadata_path, TRUE);
-
-end:
-    return fp;
-}
-
-static ctf_fs_file::UP get_file(const bt2c::CStringView trace_path, const bt2c::Logger& logger)
-{
-    auto file = bt2s::make_unique<ctf_fs_file>(logger);
-
-    if (!file) {
-        goto error;
-    }
-
-    file->path = fmt::format("{}" G_DIR_SEPARATOR_S CTF_FS_METADATA_FILENAME, trace_path);
-
-    if (ctf_fs_file_open(file.get(), "rb")) {
-        goto error;
-    }
-
-    goto end;
-
-error:
-    file.reset();
-
-end:
-    return file;
-}
-
-int ctf_fs_metadata_set_trace_class(bt_self_component *self_comp, struct ctf_fs_trace *ctf_fs_trace,
-                                    const ctf::src::ClkClsCfg& clkClsCfg)
-{
-    int ret = 0;
-    ctf_metadata_decoder_config decoder_config {ctf_fs_trace->logger};
-
-    decoder_config.self_comp = self_comp;
-    decoder_config.clkClsCfg = clkClsCfg;
-    decoder_config.create_trace_class = true;
-
-    const auto file = get_file(ctf_fs_trace->path, ctf_fs_trace->logger);
-    if (!file) {
-        BT_CPPLOGE_SPEC(ctf_fs_trace->logger, "Cannot create metadata file object.");
-        ret = -1;
-        goto end;
-    }
-
-    ctf_fs_trace->metadata->decoder = ctf_metadata_decoder_create(&decoder_config);
-    if (!ctf_fs_trace->metadata->decoder) {
-        BT_CPPLOGE_SPEC(ctf_fs_trace->logger, "Cannot create metadata decoder object.");
-        ret = -1;
-        goto end;
-    }
-
-    ret =
-        ctf_metadata_decoder_append_content(ctf_fs_trace->metadata->decoder.get(), file->fp.get());
-    if (ret) {
-        BT_CPPLOGE_SPEC(ctf_fs_trace->logger, "Cannot update metadata decoder's content.");
-        goto end;
-    }
-
-    ctf_fs_trace->metadata->trace_class =
-        ctf_metadata_decoder_get_ir_trace_class(ctf_fs_trace->metadata->decoder.get());
-    BT_ASSERT(!self_comp || ctf_fs_trace->metadata->trace_class);
-
-    ctf_fs_trace->metadata->tc =
-        ctf_metadata_decoder_borrow_ctf_trace_class(ctf_fs_trace->metadata->decoder.get());
-    BT_ASSERT(ctf_fs_trace->metadata->tc);
-
-end:
-    return ret;
-}
index 8e01469bf30e52ff771dc045722526acc4a2e286..ee0f82f7890638cc46b1914884cc0a3e62eb63c6 100755 (executable)
@@ -63,7 +63,7 @@ test_fail \
 test_fail \
        "valid-events-then-invalid-events" \
        "${data_dir}/valid-events-then-invalid-events.expect" \
-       "No event class with ID of event class ID to use in stream class: .*stream-class-id=0, event-class-id=255"
+       "At 24 bits: no event record class exists with ID 255 within the data stream class with ID 0."
 
 test_fail \
        "metadata-syntax-error" \
index f66247b2cf3ef13e2d67f5c7aaa956362502185c..e2cb9cd11f9a4dad542bb0a5c1f5505a60786083 100755 (executable)
@@ -72,7 +72,7 @@ expect_failure() {
                "${test_name}: error stack is produced"
 
        bt_grep_ok \
-               "No event class with ID of event class ID to use in stream class" \
+               "At 48 bits: no event record class exists with ID 255 within the data stream class with ID 0." \
                "$stderr_file" \
                "$test_name: expected error message is present"
 }
index 6516120e74f3c8c51999a2e0c4a1519a0edfffe9..c53132e7ed90e2d128d445d428caf2062e771153 100755 (executable)
@@ -306,6 +306,13 @@ test_compare_to_ctf_fs() {
        bt_cli "$expected_stdout" "$expected_stderr" "${trace_dir}/succeed/multi-domains" -c sink.text.details --params "with-trace-name=false,with-stream-name=false"
        bt_remove_cr "${expected_stdout}"
        bt_remove_cr "${expected_stderr}"
+
+       # Hack. To be removed when src.ctf.lttng-live is updated to use the new
+       # IR generator.
+       "$BT_TESTS_SED_BIN" -i '/User attributes:/d' "${expected_stdout}"
+       "$BT_TESTS_SED_BIN" -i '/babeltrace.org,2020:/d' "${expected_stdout}"
+       "$BT_TESTS_SED_BIN" -i '/log-level: warning/d' "${expected_stdout}"
+
        run_test "$test_text" "$cli_args_template" "$expected_stdout" \
                "$expected_stderr" "$trace_dir_native" "${server_args[@]}"
        diag "Inverse session order from lttng-relayd"
This page took 0.062006 seconds and 4 git commands to generate.