Commit | Line | Data |
---|---|---|
fca1d0f5 PP |
1 | /* |
2 | * SPDX-License-Identifier: MIT | |
3 | * | |
4 | * Copyright 2017-2023 Philippe Proulx <pproulx@efficios.com> | |
5 | */ | |
6 | ||
7 | #include <glib.h> | |
8 | ||
9 | #include "cpp-common/bt2/optional-borrowed-object.hpp" | |
10 | #include "cpp-common/bt2c/logging.hpp" | |
11 | #include "cpp-common/vendor/fmt/core.h" | |
12 | #include "cpp-common/vendor/fmt/format.h" | |
13 | ||
14 | #include "upstream-msg-iter.hpp" | |
15 | ||
16 | namespace bt2mux { | |
17 | ||
18 | UpstreamMsgIter::UpstreamMsgIter(bt2::MessageIterator::Shared msgIter, std::string portName, | |
19 | const bt2c::Logger& parentLogger) : | |
20 | _mMsgIter {std::move(msgIter)}, | |
21 | _mLogger {parentLogger, fmt::format("{}/[{}]", parentLogger.tag(), portName)}, | |
22 | _mPortName {std::move(portName)} | |
23 | { | |
24 | BT_CPPLOGI("Created an upstream message iterator: this={}, port-name={}", fmt::ptr(this), | |
25 | _mPortName); | |
26 | } | |
27 | ||
28 | namespace { | |
29 | ||
30 | /* | |
31 | * Returns the clock snapshot of `msg`, possibly missing. | |
32 | */ | |
33 | bt2::OptionalBorrowedObject<bt2::ConstClockSnapshot> msgCs(const bt2::ConstMessage msg) noexcept | |
34 | { | |
35 | switch (msg.type()) { | |
1c5ea5eb | 36 | case bt2::MessageType::Event: |
fca1d0f5 PP |
37 | if (msg.asEvent().streamClassDefaultClockClass()) { |
38 | return msg.asEvent().defaultClockSnapshot(); | |
39 | } | |
40 | ||
41 | break; | |
1c5ea5eb | 42 | case bt2::MessageType::PacketBeginning: |
fca1d0f5 PP |
43 | if (msg.asPacketBeginning().packet().stream().cls().packetsHaveBeginningClockSnapshot()) { |
44 | return msg.asPacketBeginning().defaultClockSnapshot(); | |
45 | } | |
46 | ||
47 | break; | |
1c5ea5eb | 48 | case bt2::MessageType::PacketEnd: |
fca1d0f5 PP |
49 | if (msg.asPacketEnd().packet().stream().cls().packetsHaveEndClockSnapshot()) { |
50 | return msg.asPacketEnd().defaultClockSnapshot(); | |
51 | } | |
52 | ||
53 | break; | |
1c5ea5eb | 54 | case bt2::MessageType::DiscardedEvents: |
fca1d0f5 PP |
55 | if (msg.asDiscardedEvents().stream().cls().discardedEventsHaveDefaultClockSnapshots()) { |
56 | return msg.asDiscardedEvents().beginningDefaultClockSnapshot(); | |
57 | } | |
58 | ||
59 | break; | |
1c5ea5eb | 60 | case bt2::MessageType::DiscardedPackets: |
fca1d0f5 PP |
61 | if (msg.asDiscardedPackets().stream().cls().discardedPacketsHaveDefaultClockSnapshots()) { |
62 | return msg.asDiscardedPackets().beginningDefaultClockSnapshot(); | |
63 | } | |
64 | ||
65 | break; | |
1c5ea5eb | 66 | case bt2::MessageType::MessageIteratorInactivity: |
fca1d0f5 | 67 | return msg.asMessageIteratorInactivity().clockSnapshot(); |
1c5ea5eb | 68 | case bt2::MessageType::StreamBeginning: |
8de58945 | 69 | if (msg.asStreamBeginning().streamClassDefaultClockClass()) { |
fca1d0f5 PP |
70 | return msg.asStreamBeginning().defaultClockSnapshot(); |
71 | } | |
72 | ||
73 | break; | |
1c5ea5eb | 74 | case bt2::MessageType::StreamEnd: |
8de58945 | 75 | if (msg.asStreamEnd().streamClassDefaultClockClass()) { |
fca1d0f5 PP |
76 | return msg.asStreamEnd().defaultClockSnapshot(); |
77 | } | |
78 | ||
79 | break; | |
80 | default: | |
81 | bt_common_abort(); | |
82 | } | |
83 | ||
84 | return {}; | |
85 | } | |
86 | ||
87 | } /* namespace */ | |
88 | ||
89 | UpstreamMsgIter::ReloadStatus UpstreamMsgIter::reload() | |
90 | { | |
91 | BT_ASSERT_DBG(!_mDiscardRequired); | |
92 | ||
93 | if (G_UNLIKELY(!_mMsgs.msgs)) { | |
94 | /* | |
95 | * This will either: | |
96 | * | |
97 | * 1. Set `_mMsgs.msgs` to new messages (we'll return | |
98 | * `ReloadStatus::MORE`). | |
99 | * | |
100 | * 2. Not set `_mMsgs.msgs` (ended, we'll return | |
101 | * `ReloadStatus::NO_MORE`). | |
102 | * | |
103 | * 3. Throw. | |
104 | */ | |
105 | this->_tryGetNewMsgs(); | |
106 | } | |
107 | ||
108 | if (G_UNLIKELY(!_mMsgs.msgs)) { | |
109 | /* Still none: no more */ | |
110 | _mMsgTs.reset(); | |
1c5ea5eb | 111 | return ReloadStatus::NoMore; |
fca1d0f5 PP |
112 | } else { |
113 | if (const auto cs = msgCs(this->msg())) { | |
114 | _mMsgTs = cs->nsFromOrigin(); | |
115 | BT_CPPLOGD("Cached the timestamp of the current message: this={}, ts={}", | |
116 | fmt::ptr(this), *_mMsgTs); | |
117 | } else { | |
118 | _mMsgTs.reset(); | |
119 | BT_CPPLOGD("Reset the timestamp of the current message: this={}", fmt::ptr(this)); | |
120 | } | |
121 | ||
122 | _mDiscardRequired = true; | |
1c5ea5eb | 123 | return ReloadStatus::More; |
fca1d0f5 PP |
124 | } |
125 | } | |
126 | ||
127 | void UpstreamMsgIter::_tryGetNewMsgs() | |
128 | { | |
129 | BT_ASSERT_DBG(_mMsgIter); | |
130 | BT_CPPLOGD("Calling the \"next\" method of the upstream message iterator: this={}", | |
131 | fmt::ptr(this)); | |
132 | ||
133 | /* | |
134 | * Replace with next batch! | |
135 | * | |
136 | * This may throw, in which case we'll keep our current | |
137 | * `_mMsgs.msgs` (set), still requiring to get new messages the next | |
138 | * time the user calls reload(). | |
139 | */ | |
140 | _mMsgs.msgs = _mMsgIter->next(); | |
141 | ||
142 | if (!_mMsgs.msgs) { | |
143 | /* | |
144 | * Don't destroy `*_mMsgIter` here because the user may still | |
145 | * call seekBeginning() afterwards. | |
146 | */ | |
147 | BT_CPPLOGD("End of upstream message iterator: this={}", fmt::ptr(this)); | |
148 | return; | |
149 | } | |
150 | ||
151 | _mMsgs.index = 0; | |
152 | BT_CPPLOGD("Got {1} messages from upstream: this={0}, count={1}", fmt::ptr(this), | |
153 | _mMsgs.msgs->length()); | |
154 | } | |
155 | ||
156 | bool UpstreamMsgIter::canSeekBeginning() | |
157 | { | |
158 | return _mMsgIter->canSeekBeginning(); | |
159 | } | |
160 | ||
161 | void UpstreamMsgIter::seekBeginning() | |
162 | { | |
163 | _mMsgIter->seekBeginning(); | |
164 | _mMsgs.msgs.reset(); | |
165 | _mMsgTs.reset(); | |
166 | _mDiscardRequired = false; | |
167 | } | |
168 | ||
169 | bool UpstreamMsgIter::canSeekForward() const noexcept | |
170 | { | |
171 | return _mMsgIter->canSeekForward(); | |
172 | } | |
173 | ||
174 | } /* namespace bt2mux */ |