ptr = msg._release()
return int(ptr)
- # Validate that the presence or lack of presence of a
- # `default_clock_snapshot` value is valid in the context of `stream_class`.
- @staticmethod
- def _validate_default_clock_snapshot(stream_class, default_clock_snapshot):
- stream_class_has_default_clock_class = stream_class.default_clock_class is not None
-
- if stream_class_has_default_clock_class and default_clock_snapshot is None:
- raise bt2.Error(
- 'stream class has a default clock class, default_clock_snapshot should not be None')
-
- if not stream_class_has_default_clock_class and default_clock_snapshot is not None:
- raise bt2.Error(
- 'stream class has no default clock class, default_clock_snapshot should be None')
-
def _create_event_message(self, event_class, packet, default_clock_snapshot=None):
utils._check_type(event_class, bt2.event_class._EventClass)
utils._check_type(packet, bt2.packet._Packet)
- self._validate_default_clock_snapshot(packet.stream.cls, default_clock_snapshot)
if default_clock_snapshot is not None:
+ if event_class.stream_class.default_clock_class is None:
+ raise ValueError('event messages in this stream must not have a default clock snapshot')
+
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_event_create_with_default_clock_snapshot(
self._ptr, event_class._ptr, packet._ptr, default_clock_snapshot)
else:
+ if event_class.stream_class.default_clock_class is not None:
+ raise ValueError('event messages in this stream must have a default clock snapshot')
+
ptr = native_bt.message_event_create(
self._ptr, event_class._ptr, packet._ptr)
return bt2.message._MessageIteratorInactivityMessage(ptr)
+ _unknown_clock_snapshot = bt2.message._StreamActivityMessageUnknownClockSnapshot()
+ _infinite_clock_snapshot = bt2.message._StreamActivityMessageInfiniteClockSnapshot()
+
+ @staticmethod
+ def _validate_stream_activity_message_default_clock_snapshot(stream, default_cs):
+ isinst_infinite = isinstance(default_cs, bt2.message._StreamActivityMessageInfiniteClockSnapshot)
+ isinst_unknown = isinstance(default_cs, bt2.message._StreamActivityMessageUnknownClockSnapshot)
+
+ if utils._is_uint64(default_cs):
+ pass
+ elif isinst_infinite or isinst_unknown:
+ if default_cs is not _UserMessageIterator._unknown_clock_snapshot and default_cs is not _UserMessageIterator._infinite_clock_snapshot:
+ raise ValueError('unexpected value for default clock snapshot')
+ else:
+ raise TypeError("unexpected type '{}' for default clock snapshot".format(default_cs.__class__.__name__))
+
+ if stream.cls.default_clock_class is None:
+ if utils._is_uint64(default_cs):
+ raise ValueError('stream activity messages in this stream cannot have a known default clock snapshot')
+
def _create_stream_beginning_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
return bt2.message._StreamBeginningMessage(ptr)
- def _create_stream_activity_beginning_message(self, stream, default_clock_snapshot=None):
+ def _create_stream_activity_beginning_message(self, stream,
+ default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_default_clock_snapshot(stream.cls, default_clock_snapshot)
-
+ self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
ptr = native_bt.message_stream_activity_beginning_create(self._ptr, stream._ptr)
if ptr is None:
'cannot create stream activity beginning message object')
msg = bt2.message._StreamActivityBeginningMessage(ptr)
-
- if default_clock_snapshot is not None:
- msg._default_clock_snapshot = default_clock_snapshot
-
+ msg._default_clock_snapshot = default_clock_snapshot
return msg
- def _create_stream_activity_end_message(self, stream, default_clock_snapshot=None):
+ def _create_stream_activity_end_message(self, stream,
+ default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_default_clock_snapshot(stream.cls, default_clock_snapshot)
-
+ self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
ptr = native_bt.message_stream_activity_end_create(self._ptr, stream._ptr)
if ptr is None:
'cannot create stream activity end message object')
msg = bt2.message._StreamActivityEndMessage(ptr)
-
- if default_clock_snapshot is not None:
- msg._default_clock_snapshot = default_clock_snapshot
-
+ msg._default_clock_snapshot = default_clock_snapshot
return msg
def _create_stream_end_message(self, stream):