@bt_pre_not_null{message}
@bt_pre_hot{message}
@bt_pre_is_disc_ev_msg{message}
+@pre
+ \bt_p{count} > 0
@sa bt_message_discarded_events_get_count() —
Returns the number of discarded events of a discarded events
@bt_pre_not_null{message}
@bt_pre_hot{message}
@bt_pre_is_disc_pkt_msg{message}
+@pre
+ \bt_p{count} > 0
@sa bt_message_discarded_packets_get_count() —
Returns the number of discarded packets of a discarded packets
def _set_count(self, count):
utils._check_uint64(count)
+
+ if count == 0:
+ raise ValueError('discarded {} count is 0'.format(self._item_name))
+
self._set_count(self._ptr, count)
_count = property(fget=_DiscardedMessageConst.count.fget, fset=_set_count)
class _DiscardedEventsMessage(_DiscardedEventsMessageConst, _DiscardedMessage):
_borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream)
_set_count = staticmethod(native_bt.message_discarded_events_set_count)
+ _item_name = 'event'
class _DiscardedPacketsMessageConst(_DiscardedMessageConst):
class _DiscardedPacketsMessage(_DiscardedPacketsMessageConst, _DiscardedMessage):
_borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream)
_set_count = staticmethod(native_bt.message_discarded_packets_set_count)
+ _item_name = 'packet'
_MESSAGE_TYPE_TO_CLS = {
{
BT_ASSERT_PRE_NON_NULL(message, "Message");
BT_ASSERT_PRE_MSG_IS_TYPE(message, BT_MESSAGE_TYPE_DISCARDED_EVENTS);
+ BT_ASSERT_PRE(count > 0, "Discarded event count is 0.");
set_discarded_items_message_count(message, count);
}
{
BT_ASSERT_PRE_NON_NULL(message, "Message");
BT_ASSERT_PRE_MSG_IS_TYPE(message, BT_MESSAGE_TYPE_DISCARDED_PACKETS);
+ BT_ASSERT_PRE(count > 0, "Discarded packet count is 0.");
set_discarded_items_message_count(message, count);
}
self.assertIs(type(msg), bt2._DiscardedEventsMessage)
self.assertEqual(msg.count, 242)
+ # With event count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(supports_discarded_events=True)
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded event count is 0',
+ ):
+ msg_iter._create_discarded_events_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
# With clock snapshots.
def test_create_with_clock_snapshots(self):
def create_stream_class(tc, cc):
self.assertIs(type(msg), bt2._DiscardedPacketsMessage)
self.assertEqual(msg.count, 242)
+ # With packet count == 0.
+ def test_create_with_count_zero_raises(self):
+ def create_stream_class(tc, cc):
+ return tc.create_stream_class(
+ supports_packets=True, supports_discarded_packets=True
+ )
+
+ def msg_iter_next(msg_iter, stream):
+ with self.assertRaisesRegex(
+ ValueError, 'discarded packet count is 0',
+ ):
+ msg_iter._create_discarded_packets_message(stream, count=0)
+
+ return 123
+
+ res = utils.run_in_message_iterator_next(create_stream_class, msg_iter_next)
+ self.assertEqual(res, 123)
+
# With clock snapshots.
def test_create_with_clock_snapshots(self):
def create_stream_class(tc, cc):