2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Philippe Proulx <pproulx@efficios.com>
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
12 #include <babeltrace2/babeltrace.h>
14 #include "common/assert.h"
15 #include "compat/mman.h" /* IWYU pragma: keep */
16 #include "cpp-common/bt2s/make-unique.hpp"
17 #include "cpp-common/vendor/fmt/format.h"
19 #include "../common/src/pkt-props.hpp"
20 #include "data-stream.hpp"
22 #define STREAM_NAME_PREFIX "stream-"
24 using namespace bt2c::literals::datalen
;
30 Buf
CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream
, bt2c::DataLen minSize
)
32 BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}",
33 _mLiveStreamIter
.stream
? _mLiveStreamIter
.stream
->id() : -1,
34 requestedOffsetInStream
.bytes(), minSize
.bytes());
36 if (_mLiveStreamIter
.has_stream_hung_up
)
39 BT_ASSERT(requestedOffsetInStream
>= _mCurPktBegOffsetInStream
);
40 auto requestedOffsetInPacket
= requestedOffsetInStream
- _mCurPktBegOffsetInStream
;
42 BT_ASSERT(_mLiveStreamIter
.curPktInfo
);
44 if (requestedOffsetInPacket
== _mLiveStreamIter
.curPktInfo
->len
) {
45 _mCurPktBegOffsetInStream
+= _mLiveStreamIter
.curPktInfo
->len
;
46 _mLiveStreamIter
.curPktInfo
.reset();
47 lttng_live_stream_iterator_set_state(&_mLiveStreamIter
, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
);
48 throw bt2c::TryAgain
{};
51 auto requestedOffsetInRelay
=
52 _mLiveStreamIter
.curPktInfo
->offsetInRelay
+ requestedOffsetInPacket
;
53 auto lenUntilEndOfPacket
= _mLiveStreamIter
.curPktInfo
->len
- requestedOffsetInPacket
;
55 auto maxReqLen
= bt2c::DataLen::fromBytes(
56 _mLiveStreamIter
.trace
->session
->lttng_live_msg_iter
->lttng_live_comp
->max_query_size
);
57 auto reqLen
= std::min(lenUntilEndOfPacket
, maxReqLen
);
60 _mBuf
.resize(reqLen
.bytes());
62 lttng_live_get_stream_bytes_status status
= lttng_live_get_stream_bytes(
63 _mLiveStreamIter
.trace
->session
->lttng_live_msg_iter
, &_mLiveStreamIter
, _mBuf
.data(),
64 requestedOffsetInRelay
.bytes(), reqLen
.bytes(), &recvLen
);
66 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK
:
67 _mBuf
.resize(recvLen
);
70 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN
:
71 BT_CPPLOGD("CtfLiveMedium::buf try again");
72 throw bt2c::TryAgain();
74 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF
:
75 BT_CPPLOGD("CtfLiveMedium::buf eof");
78 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR
:
79 BT_CPPLOGD("CtfLiveMedium::buf error");
83 const Buf buf
{_mBuf
.data(), bt2c::DataLen::fromBytes(_mBuf
.size())};
85 BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}",
86 _mLiveStreamIter
.stream
? _mLiveStreamIter
.stream
->id() : -1, fmt::ptr(buf
.addr()),
92 } /* namespace live */
96 lttng_live_iterator_status
97 lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator
*liveStreamIter
)
99 BT_ASSERT(!liveStreamIter
->msg_iter
);
100 BT_ASSERT(!liveStreamIter
->stream
);
101 lttng_live_trace
*trace
= liveStreamIter
->trace
;
102 lttng_live_msg_iter
*liveMsgIter
= trace
->session
->lttng_live_msg_iter
;
104 auto tempMedium
= bt2s::make_unique
<ctf::src::live::CtfLiveMedium
>(*liveStreamIter
);
105 const ctf::src::TraceCls
*ctfTc
= liveStreamIter
->trace
->metadata
->traceCls();
107 ctf::src::PktProps pktProps
=
108 ctf::src::readPktProps(*ctfTc
, std::move(tempMedium
), 0_bytes
, liveStreamIter
->logger
);
110 bt2::OptionalBorrowedObject
<bt2::TraceClass
> tc
= ctfTc
->libCls();
112 BT_ASSERT(liveStreamIter
->ctf_stream_class_id
.is_set
);
113 BT_ASSERT(trace
->trace
);
115 auto sc
= tc
->streamClassById(liveStreamIter
->ctf_stream_class_id
.value
);
117 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter
->logger
, bt2::Error
,
118 "No stream class with id {}",
119 liveStreamIter
->ctf_stream_class_id
.value
);
122 bt_stream
*streamPtr
;
123 if (pktProps
.dataStreamId
) {
124 streamPtr
= bt_stream_create_with_id(sc
->libObjPtr(), trace
->trace
->libObjPtr(),
125 *pktProps
.dataStreamId
);
128 * No stream instance ID in the stream. It's possible
129 * to encounter this situation with older version of
130 * LTTng. In these cases, use the viewer_stream_id that
131 * is unique for a live viewer session.
133 streamPtr
= bt_stream_create_with_id(sc
->libObjPtr(), trace
->trace
->libObjPtr(),
134 liveStreamIter
->viewer_stream_id
);
136 BT_ASSERT(streamPtr
);
137 liveStreamIter
->stream
= bt2::Stream::Shared::createWithoutRef(streamPtr
);
138 liveStreamIter
->stream
->name(liveStreamIter
->name
);
140 auto medium
= bt2s::make_unique
<ctf::src::live::CtfLiveMedium
>(*liveStreamIter
);
141 liveStreamIter
->msg_iter
.emplace(liveMsgIter
->selfMsgIter
, *ctfTc
,
142 liveStreamIter
->trace
->metadata
->metadataStreamUuid(),
143 *liveStreamIter
->stream
, std::move(medium
),
144 ctf::src::MsgIterQuirks
{}, liveStreamIter
->logger
);
145 return LTTNG_LIVE_ITERATOR_STATUS_OK
;
148 enum lttng_live_iterator_status
lttng_live_lazy_msg_init(struct lttng_live_session
*session
,
149 const bt2::SelfMessageIterator selfMsgIter
)
151 if (!session
->lazy_stream_msg_init
) {
152 return LTTNG_LIVE_ITERATOR_STATUS_OK
;
155 BT_CPPLOGD_SPEC(session
->logger
,
156 "Lazily initializing self message iterator for live session: "
157 "session-id={}, self-msg-iter-addr={}",
158 session
->id
, fmt::ptr(selfMsgIter
.libObjPtr()));
160 for (lttng_live_trace::UP
& trace
: session
->traces
) {
161 for (lttng_live_stream_iterator::UP
& stream_iter
: trace
->stream_iterators
) {
162 if (stream_iter
->msg_iter
) {
166 const ctf::src::TraceCls
*ctfTraceCls
= trace
->metadata
->traceCls();
167 BT_CPPLOGD_SPEC(session
->logger
,
168 "Creating CTF message iterator: session-id={}, ctf-tc-addr={}, "
169 "stream-iter-name={}, self-msg-iter-addr={}",
170 session
->id
, fmt::ptr(ctfTraceCls
), stream_iter
->name
.c_str(),
171 fmt::ptr(selfMsgIter
.libObjPtr()));
175 session
->lazy_stream_msg_init
= false;
177 return LTTNG_LIVE_ITERATOR_STATUS_OK
;
180 struct lttng_live_stream_iterator
*
181 lttng_live_stream_iterator_create(struct lttng_live_session
*session
, uint64_t ctf_trace_id
,
184 std::stringstream nameSs
;
187 BT_ASSERT(session
->lttng_live_msg_iter
);
188 BT_ASSERT(session
->lttng_live_msg_iter
->lttng_live_comp
);
190 const auto trace
= lttng_live_session_borrow_or_create_trace_by_id(session
, ctf_trace_id
);
192 BT_CPPLOGE_APPEND_CAUSE_SPEC(session
->logger
, "Failed to borrow CTF trace.");
196 auto stream_iter
= bt2s::make_unique
<lttng_live_stream_iterator
>(session
->logger
);
198 stream_iter
->trace
= trace
;
199 stream_iter
->state
= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
;
200 stream_iter
->viewer_stream_id
= stream_id
;
202 stream_iter
->ctf_stream_class_id
.is_set
= false;
203 stream_iter
->ctf_stream_class_id
.value
= UINT64_MAX
;
205 stream_iter
->last_inactivity_ts
.is_set
= false;
206 stream_iter
->last_inactivity_ts
.value
= 0;
208 nameSs
<< STREAM_NAME_PREFIX
<< stream_iter
->viewer_stream_id
;
209 stream_iter
->name
= nameSs
.str();
211 const auto ret
= stream_iter
.get();
212 trace
->stream_iterators
.emplace_back(std::move(stream_iter
));
214 /* Track the number of active stream iterator. */
215 session
->lttng_live_msg_iter
->active_stream_iter
++;
220 void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator
*streamIter
,
221 uint64_t ctfStreamClsId
)
223 if (streamIter
->ctf_stream_class_id
.is_set
) {
224 BT_ASSERT(streamIter
->ctf_stream_class_id
.value
== ctfStreamClsId
);
227 streamIter
->ctf_stream_class_id
.value
= ctfStreamClsId
;
228 streamIter
->ctf_stream_class_id
.is_set
= true;
232 lttng_live_stream_iterator::~lttng_live_stream_iterator()
234 /* Track the number of active stream iterator. */
235 this->trace
->session
->lttng_live_msg_iter
->active_stream_iter
--;