flt.utils.muxer: add IWYU pragma
[babeltrace.git] / src / plugins / ctf / lttng-live / data-stream.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Philippe Proulx <pproulx@efficios.com>
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7cdc2bab
MD
8 */
9
e28ca558
SM
10#include <sstream>
11
5656cea5
PP
12#include <babeltrace2/babeltrace.h>
13
578e048b 14#include "common/assert.h"
83ad336c 15#include "compat/mman.h" /* IWYU pragma: keep */
6eeff3ca 16#include "cpp-common/bt2s/make-unique.hpp"
0f5c5d5c 17#include "cpp-common/vendor/fmt/format.h"
c802cacb 18
81c7f242 19#include "../common/src/pkt-props.hpp"
087cd0f5 20#include "data-stream.hpp"
7cdc2bab 21
4164020e 22#define STREAM_NAME_PREFIX "stream-"
14f28187 23
81c7f242
SM
24using namespace bt2c::literals::datalen;
25
26namespace ctf {
27namespace src {
28namespace live {
29
30Buf CtfLiveMedium::buf(bt2c::DataLen requestedOffsetInStream, bt2c::DataLen minSize)
7cdc2bab 31{
81c7f242
SM
32 BT_CPPLOGD("CtfLiveMedium::buf called: stream-id={}, offset-bytes={}, min-size-bytes={}",
33 _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1,
34 requestedOffsetInStream.bytes(), minSize.bytes());
4164020e 35
81c7f242
SM
36 if (_mLiveStreamIter.has_stream_hung_up)
37 throw NoData {};
4164020e 38
81c7f242
SM
39 BT_ASSERT(requestedOffsetInStream >= _mCurPktBegOffsetInStream);
40 auto requestedOffsetInPacket = requestedOffsetInStream - _mCurPktBegOffsetInStream;
60323499 41
81c7f242 42 BT_ASSERT(_mLiveStreamIter.curPktInfo);
60323499 43
81c7f242
SM
44 if (requestedOffsetInPacket == _mLiveStreamIter.curPktInfo->len) {
45 _mCurPktBegOffsetInStream += _mLiveStreamIter.curPktInfo->len;
46 _mLiveStreamIter.curPktInfo.reset();
47 lttng_live_stream_iterator_set_state(&_mLiveStreamIter, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
48 throw bt2c::TryAgain {};
60323499
SM
49 }
50
81c7f242
SM
51 auto requestedOffsetInRelay =
52 _mLiveStreamIter.curPktInfo->offsetInRelay + requestedOffsetInPacket;
53 auto lenUntilEndOfPacket = _mLiveStreamIter.curPktInfo->len - requestedOffsetInPacket;
60323499 54
81c7f242
SM
55 auto maxReqLen = bt2c::DataLen::fromBytes(
56 _mLiveStreamIter.trace->session->lttng_live_msg_iter->lttng_live_comp->max_query_size);
57 auto reqLen = std::min(lenUntilEndOfPacket, maxReqLen);
58 uint64_t recvLen;
7cdc2bab 59
81c7f242 60 _mBuf.resize(reqLen.bytes());
4164020e 61
81c7f242
SM
62 lttng_live_get_stream_bytes_status status = lttng_live_get_stream_bytes(
63 _mLiveStreamIter.trace->session->lttng_live_msg_iter, &_mLiveStreamIter, _mBuf.data(),
64 requestedOffsetInRelay.bytes(), reqLen.bytes(), &recvLen);
65 switch (status) {
66 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_OK:
67 _mBuf.resize(recvLen);
68 break;
69
70 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_AGAIN:
71 BT_CPPLOGD("CtfLiveMedium::buf try again");
72 throw bt2c::TryAgain();
4164020e 73
81c7f242
SM
74 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_EOF:
75 BT_CPPLOGD("CtfLiveMedium::buf eof");
76 throw NoData();
e28ca558 77
81c7f242
SM
78 case LTTNG_LIVE_GET_STREAM_BYTES_STATUS_ERROR:
79 BT_CPPLOGD("CtfLiveMedium::buf error");
80 throw bt2c::Error();
4164020e 81 }
7cdc2bab 82
81c7f242
SM
83 const Buf buf {_mBuf.data(), bt2c::DataLen::fromBytes(_mBuf.size())};
84
85 BT_CPPLOGD("CtfLiveMedium::buf returns: stream-id={}, buf-addr={}, buf-size-bytes={}",
86 _mLiveStreamIter.stream ? _mLiveStreamIter.stream->id() : -1, fmt::ptr(buf.addr()),
87 buf.size().bytes());
88
89 return buf;
7cdc2bab
MD
90}
91
81c7f242
SM
92} /* namespace live */
93} /* namespace src */
94} /* namespace ctf */
95
96lttng_live_iterator_status
97lttng_live_stream_iterator_create_msg_iter(lttng_live_stream_iterator *liveStreamIter)
98{
99 BT_ASSERT(!liveStreamIter->msg_iter);
100 BT_ASSERT(!liveStreamIter->stream);
101 lttng_live_trace *trace = liveStreamIter->trace;
102 lttng_live_msg_iter *liveMsgIter = trace->session->lttng_live_msg_iter;
103
104 auto tempMedium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
105 const ctf::src::TraceCls *ctfTc = liveStreamIter->trace->metadata->traceCls();
106 BT_ASSERT(ctfTc);
107 ctf::src::PktProps pktProps =
108 ctf::src::readPktProps(*ctfTc, std::move(tempMedium), 0_bytes, liveStreamIter->logger);
109
110 bt2::OptionalBorrowedObject<bt2::TraceClass> tc = ctfTc->libCls();
111 BT_ASSERT(tc);
112 BT_ASSERT(liveStreamIter->ctf_stream_class_id.is_set);
113 BT_ASSERT(trace->trace);
114
115 auto sc = tc->streamClassById(liveStreamIter->ctf_stream_class_id.value);
116 if (!sc) {
117 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(liveStreamIter->logger, bt2::Error,
118 "No stream class with id {}",
119 liveStreamIter->ctf_stream_class_id.value);
120 }
121
122 bt_stream *streamPtr;
123 if (pktProps.dataStreamId) {
124 streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
125 *pktProps.dataStreamId);
126 } else {
127 /*
128 * No stream instance ID in the stream. It's possible
129 * to encounter this situation with older version of
130 * LTTng. In these cases, use the viewer_stream_id that
131 * is unique for a live viewer session.
132 */
133 streamPtr = bt_stream_create_with_id(sc->libObjPtr(), trace->trace->libObjPtr(),
134 liveStreamIter->viewer_stream_id);
135 }
136 BT_ASSERT(streamPtr);
137 liveStreamIter->stream = bt2::Stream::Shared::createWithoutRef(streamPtr);
138 liveStreamIter->stream->name(liveStreamIter->name);
139
140 auto medium = bt2s::make_unique<ctf::src::live::CtfLiveMedium>(*liveStreamIter);
4d6634b8 141 liveStreamIter->msg_iter.emplace(liveMsgIter->selfMsgIter, *ctfTc,
81c7f242
SM
142 liveStreamIter->trace->metadata->metadataStreamUuid(),
143 *liveStreamIter->stream, std::move(medium),
144 ctf::src::MsgIterQuirks {}, liveStreamIter->logger);
145 return LTTNG_LIVE_ITERATOR_STATUS_OK;
146}
7cdc2bab 147
4164020e 148enum lttng_live_iterator_status lttng_live_lazy_msg_init(struct lttng_live_session *session,
4d6634b8 149 const bt2::SelfMessageIterator selfMsgIter)
7cdc2bab 150{
4164020e
SM
151 if (!session->lazy_stream_msg_init) {
152 return LTTNG_LIVE_ITERATOR_STATUS_OK;
153 }
154
0f5c5d5c
SM
155 BT_CPPLOGD_SPEC(session->logger,
156 "Lazily initializing self message iterator for live session: "
157 "session-id={}, self-msg-iter-addr={}",
4d6634b8 158 session->id, fmt::ptr(selfMsgIter.libObjPtr()));
4164020e 159
6dcda123 160 for (lttng_live_trace::UP& trace : session->traces) {
db00b877 161 for (lttng_live_stream_iterator::UP& stream_iter : trace->stream_iterators) {
4164020e
SM
162 if (stream_iter->msg_iter) {
163 continue;
164 }
db00b877 165
81c7f242
SM
166 const ctf::src::TraceCls *ctfTraceCls = trace->metadata->traceCls();
167 BT_CPPLOGD_SPEC(session->logger,
0f5c5d5c
SM
168 "Creating CTF message iterator: session-id={}, ctf-tc-addr={}, "
169 "stream-iter-name={}, self-msg-iter-addr={}",
81c7f242 170 session->id, fmt::ptr(ctfTraceCls), stream_iter->name.c_str(),
4d6634b8 171 fmt::ptr(selfMsgIter.libObjPtr()));
4164020e
SM
172 }
173 }
174
175 session->lazy_stream_msg_init = false;
176
177 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
178}
179
4164020e
SM
180struct lttng_live_stream_iterator *
181lttng_live_stream_iterator_create(struct lttng_live_session *session, uint64_t ctf_trace_id,
81c7f242 182 uint64_t stream_id)
7cdc2bab 183{
e28ca558 184 std::stringstream nameSs;
4164020e
SM
185
186 BT_ASSERT(session);
187 BT_ASSERT(session->lttng_live_msg_iter);
188 BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp);
4164020e 189
81c7f242 190 const auto trace = lttng_live_session_borrow_or_create_trace_by_id(session, ctf_trace_id);
4164020e 191 if (!trace) {
0f5c5d5c 192 BT_CPPLOGE_APPEND_CAUSE_SPEC(session->logger, "Failed to borrow CTF trace.");
6eeff3ca 193 return nullptr;
4164020e
SM
194 }
195
6eeff3ca
SM
196 auto stream_iter = bt2s::make_unique<lttng_live_stream_iterator>(session->logger);
197
4164020e
SM
198 stream_iter->trace = trace;
199 stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
200 stream_iter->viewer_stream_id = stream_id;
201
202 stream_iter->ctf_stream_class_id.is_set = false;
203 stream_iter->ctf_stream_class_id.value = UINT64_MAX;
204
205 stream_iter->last_inactivity_ts.is_set = false;
206 stream_iter->last_inactivity_ts.value = 0;
207
e28ca558
SM
208 nameSs << STREAM_NAME_PREFIX << stream_iter->viewer_stream_id;
209 stream_iter->name = nameSs.str();
6eeff3ca
SM
210
211 const auto ret = stream_iter.get();
db00b877 212 trace->stream_iterators.emplace_back(std::move(stream_iter));
4164020e
SM
213
214 /* Track the number of active stream iterator. */
215 session->lttng_live_msg_iter->active_stream_iter++;
216
6eeff3ca 217 return ret;
7cdc2bab
MD
218}
219
81c7f242
SM
220void lttng_live_stream_iterator_set_stream_class(lttng_live_stream_iterator *streamIter,
221 uint64_t ctfStreamClsId)
222{
223 if (streamIter->ctf_stream_class_id.is_set) {
224 BT_ASSERT(streamIter->ctf_stream_class_id.value == ctfStreamClsId);
225 return;
226 } else {
227 streamIter->ctf_stream_class_id.value = ctfStreamClsId;
228 streamIter->ctf_stream_class_id.is_set = true;
229 }
230}
231
ce4ee876 232lttng_live_stream_iterator::~lttng_live_stream_iterator()
7cdc2bab 233{
4164020e 234 /* Track the number of active stream iterator. */
ce4ee876
SM
235 this->trace->session->lttng_live_msg_iter->active_stream_iter--;
236}
This page took 0.105297 seconds and 5 git commands to generate.