return _bt_trace_handle_get_timestamp_end(
self._trace_collection._tc, self._id, ClockType.CLOCK_REAL)
+ @property
+ def events(self):
+ """
+ Generator returning all events (EventDeclaration) in a trace.
+ """
+ ret = _bt_python_event_decl_listcaller(self.id, self._trace_collection._tc)
+
+ if not isinstance(ret, list):
+ return
+
+ ptr_list, count = ret
+ for i in range(count):
+ tmp = EventDeclaration.__new__(EventDeclaration)
+ tmp._ed = _bt_python_decl_one_from_list(ptr_list, i)
+ yield tmp
+
%}
const struct bt_ctf_field_decl *field);
%rename("_bt_ctf_get_decl_from_def") bt_ctf_get_decl_from_def(
const struct bt_definition *field);
+%rename("_bt_ctf_get_decl_from_field_decl") bt_ctf_get_decl_from_field_decl(
+ const struct bt_ctf_field_decl *field);
%rename("_bt_array_index") bt_array_index(struct definition_array *array, uint64_t i);
%rename("_bt_sequence_len") bt_sequence_len(struct definition_sequence *sequence);
%rename("_bt_sequence_index") bt_sequence_index(struct definition_sequence *sequence, uint64_t i);
const char *bt_ctf_get_decl_event_name(const struct bt_ctf_event_decl *event);
const char *bt_ctf_get_decl_field_name(const struct bt_ctf_field_decl *field);
const struct bt_declaration *bt_ctf_get_decl_from_def(const struct bt_definition *field);
+const struct bt_declaration *bt_ctf_get_decl_from_field_decl(const struct bt_ctf_field_decl *field);
uint64_t bt_sequence_len(struct definition_sequence *sequence);
struct bt_definition *bt_sequence_index(struct definition_sequence *sequence, uint64_t i);
uint64_t bt_ctf_get_struct_field_count(const struct bt_definition *structure);
ASCII = 2
UNKNOWN = 3
+# Based on the enum in ctf-writer/writer.h
+class ByteOrder:
+ BYTE_ORDER_NATIVE = 0
+ BYTE_ORDER_LITTLE_ENDIAN = 1
+ BYTE_ORDER_BIG_ENDIAN = 2
+ BYTE_ORDER_NETWORK = 3
+ BYTE_ORDER_UNKNOWN = 4 # Python-specific entry
+
#enum equivalent, accessible constants
#These are taken directly from ctf/events.h
#All changes to enums must also be made here
SEQUENCE = 9
NR_CTF_TYPES = 10
- def get_type_name(id):
- name = "UNKNOWN"
+ def type_name(id):
+ name = "UNKNOWN_TYPE"
constants = [attr for attr in dir(CTFTypeId) if not callable(getattr(CTFTypeId, attr)) and not attr.startswith("__")]
for attr in constants:
if getattr(CTFTypeId, attr) == id:
break
return name
-class scope:
+class CTFScope:
TRACE_PACKET_HEADER = 0
STREAM_PACKET_CONTEXT = 1
STREAM_EVENT_HEADER = 2
EVENT_CONTEXT = 4
EVENT_FIELDS = 5
+ def scope_name(scope):
+ name = "UNKNOWN_SCOPE"
+ constants = [attr for attr in dir(CTFScope) if not callable(getattr(CTFScope, attr)) and not attr.startswith("__")]
+ for attr in constants:
+ if getattr(CTFScope, attr) == scope:
+ name = attr
+ break
+ return name
+
# Priority of the scopes when searching for event fields
-_scopes = [scope.EVENT_FIELDS, scope.EVENT_CONTEXT, scope.STREAM_EVENT_CONTEXT,
- scope.STREAM_EVENT_HEADER, scope.STREAM_PACKET_CONTEXT, scope.TRACE_PACKET_HEADER]
+_scopes = [CTFScope.EVENT_FIELDS, CTFScope.EVENT_CONTEXT, CTFScope.STREAM_EVENT_CONTEXT,
+ CTFScope.STREAM_EVENT_HEADER, CTFScope.STREAM_PACKET_CONTEXT, CTFScope.TRACE_PACKET_HEADER]
import collections
class Event(collections.Mapping):
"""Return the name of the event or None on error"""
return _bt_ctf_get_decl_event_name(self._ed)
- def fields(self, scope):
+ @property
+ def fields(self):
"""
- Return a list of FieldDeclaration
- Return None on error.
+ Generator returning all FieldDeclarations of an event, going through
+ each scope in the following order:
+ 1) EVENT_FIELDS
+ 2) EVENT_CONTEXT
+ 3) STREAM_EVENT_CONTEXT
+ 4) STREAM_EVENT_HEADER
+ 5) STREAM_PACKET_CONTEXT
+ 6) TRACE_PACKET_HEADER
+ """
+ for scope in _scopes:
+ for declaration in self.fields_scope(scope):
+ yield declaration
+
+ def fields_scope(self, scope):
+ """
+ Generator returning FieldDeclarations of the current event in scope.
"""
ret = _by_python_field_decl_listcaller(self._ed, scope)
if not isinstance(ret, list):
- return None
+ return
list_ptr, count = ret
- declarations = []
for i in range(count):
- declaration_ptr = _bt_python_field_decl_one_from_list(list_ptr, i)
- if declaration_ptr is not None:
- declaration = FieldDeclaration.__new__(FieldDeclaration)
- declaration._fd = declaration_ptr
- declarations.append(declaration)
- return declarations
+ field_declaration_ptr = _bt_python_field_decl_one_from_list(list_ptr, i)
+ if field_declaration_ptr is not None:
+ declaration_ptr = _bt_ctf_get_decl_from_field_decl(field_declaration_ptr)
+ field_declaration = _create_field_declaration(declaration_ptr, _bt_ctf_get_decl_field_name(field_declaration_ptr), scope)
+ yield field_declaration
class FieldDeclaration(object):
"""Field declaration class. Do not instantiate."""
def __init__(self):
raise NotImplementedError("FieldDeclaration cannot be instantiated")
+ def __repr__(self):
+ return "({0}) {1} {2}".format(CTFScope.scope_name(self.scope), CTFTypeId.type_name(self.type), self.name)
+
@property
def name(self):
- """Return the name of a FieldDeclaration or None on error"""
- return _bt_ctf_get_decl_field_name(self._fd)
+ """Return the name of a FieldDeclaration or None on error."""
+ return self._name
+
+ @property
+ def type(self):
+ """
+ Return the FieldDeclaration's type. One of the entries in class
+ CTFTypeId.
+ """
+ return _bt_ctf_field_type(self._fd)
+
+ @property
+ def scope(self):
+ """
+ Return the FieldDeclaration's scope.
+ """
+ return self._s
+
+class IntegerFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("IntegerFieldDeclaration cannot be instantiated")
+
+ @property
+ def signedness(self):
+ """
+ Return the signedness of an integer:
+ 0 if unsigned; 1 if signed; -1 on error.
+ """
+ return _bt_ctf_get_int_signedness(self._fd)
+
+ @property
+ def base(self):
+ """Return the base of an int or a negative value on error."""
+ return _bt_ctf_get_int_base(self._fd)
+
+ @property
+ def byte_order(self):
+ """
+ Return the byte order. One of class ByteOrder's entries.
+ """
+ ret = _bt_ctf_get_int_byte_order(self._fd)
+ if ret == 1234:
+ return ByteOrder.BYTE_ORDER_LITTLE_ENDIAN
+ elif ret == 4321:
+ return ByteOrder.BYTE_ORDER_BIG_ENDIAN
+ else:
+ return ByteOrder.BYTE_ORDER_UNKNOWN
+
+ @property
+ def length(self):
+ """
+ Return the size, in bits, of an int or a negative
+ value on error.
+ """
+ return _bt_ctf_get_int_len(self._fd)
+
+ @property
+ def encoding(self):
+ """
+ Return the encoding. One of class CTFStringEncoding's entries.
+ Return a negative value on error.
+ """
+ return _bt_ctf_get_encoding(self._fd)
+
+class EnumerationFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("EnumerationFieldDeclaration cannot be instantiated")
+
+class ArrayFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("ArrayFieldDeclaration cannot be instantiated")
+
+ @property
+ def length(self):
+ """
+ Return the length of an array or a negative
+ value on error.
+ """
+ return _bt_ctf_get_array_len(self._fd)
+
+class SequenceFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("SequenceFieldDeclaration cannot be instantiated")
+
+class FloatFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("FloatFieldDeclaration cannot be instantiated")
+
+class StructureFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("StructureFieldDeclaration cannot be instantiated")
+
+class StringFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("StringFieldDeclaration cannot be instantiated")
+
+class VariantFieldDeclaration(FieldDeclaration):
+ """Do not instantiate."""
+ def __init__(self):
+ raise NotImplementedError("VariantFieldDeclaration cannot be instantiated")
def field_error():
"""
"""
return _bt_ctf_field_get_error()
-def event_declaration_list(trace_handle, trace_collection):
+def _create_field_declaration(declaration_ptr, name, scope):
"""
- Return a list of EventDeclaration
- Return None on error.
+ Private field declaration factory.
"""
- try:
- handle_id = trace_handle._id
- except AttributeError:
- raise TypeError("in get_event_decl_list, "
- "argument 1 must be a TraceHandle instance")
- try:
- ptr_list, count = _bt_python_event_decl_listcaller(handle_id, trace_collection._tc)
- except AttributeError:
- raise TypeError("in get_event_decl_list, "
- "argument 2 must be a TraceCollection instance")
-
- if ptr_list is None:
- return None
-
- decl_list = []
- for i in range(count):
- tmp = EventDeclaration.__new__(EventDeclaration)
- tmp._ed = _bt_python_decl_one_from_list(ptr_list, i)
- decl_list.append(tmp)
-
- return decl_list
+ if declaration_ptr is None:
+ raise ValueError("declaration_ptr must be valid")
+ if not scope in _scopes:
+ raise ValueError("Invalid scope provided")
+
+ type = _bt_ctf_field_type(declaration_ptr)
+ declaration = None
+ if type == CTFTypeId.INTEGER:
+ declaration = IntegerFieldDeclaration.__new__(IntegerFieldDeclaration)
+ elif type == CTFTypeId.ENUM:
+ declaration = EnumerationFieldDeclaration.__new__(EnumerationFieldDeclaration)
+ elif type == CTFTypeId.ARRAY:
+ declaration = ArrayFieldDeclaration.__new__(ArrayFieldDeclaration)
+ elif type == CTFTypeId.SEQUENCE:
+ declaration = SequenceFieldDeclaration.__new__(SequenceFieldDeclaration)
+ elif type == CTFTypeId.FLOAT:
+ declaration = FloatFieldDeclaration.__new__(FloatFieldDeclaration)
+ elif type == CTFTypeId.STRUCT:
+ declaration = StructureFieldDeclaration.__new__(StructureFieldDeclaration)
+ elif type == CTFTypeId.STRING:
+ declaration = StringFieldDeclaration.__new__(StringFieldDeclaration)
+ elif type == CTFTypeId.VARIANT:
+ declaration = VariantFieldDeclaration.__new__(VariantFieldDeclaration)
+ else:
+ return declaration
+
+ declaration._fd = declaration_ptr
+ declaration._s = scope
+ declaration._name = name
+ return declaration
class _Definition(object):
def __init__(self, definition_ptr, scope):
if not scope in _scopes:
ValueError("Invalid scope provided")
- def __repr__(self):
- return "Babeltrace Definition: name('{0}'), type({1})".format(self.name, self.type)
-
@property
def name(self):
"""Return the name of a field or None on error."""
"""Return the type of a field or -1 if unknown."""
return _bt_ctf_field_type(_bt_ctf_get_decl_from_def(self._d))
- def get_int_signedness(self):
- """
- Return the signedness of an integer:
- 0 if unsigned; 1 if signed; -1 on error.
- """
- return _bt_ctf_get_int_signedness(_bt_ctf_get_decl_from_def(self._d))
-
- def get_int_base(self):
- """Return the base of an int or a negative value on error."""
- return _bt_ctf_get_int_base(_bt_ctf_get_decl_from_def(self._d))
-
- def get_int_byte_order(self):
- """
- Return the byte order of an int or a negative
- value on error.
- """
- return _bt_ctf_get_int_byte_order(_bt_ctf_get_decl_from_def(self._d))
-
- def get_int_len(self):
- """
- Return the size, in bits, of an int or a negative
- value on error.
- """
- return _bt_ctf_get_int_len(_bt_ctf_get_decl_from_def(self._d))
+ @property
+ def declaration(self):
+ """Return the associated Definition object."""
+ return _create_field_declaration(_bt_ctf_get_decl_from_def(self._d), self.name, self.scope)
- def get_enum_str(self):
+ def _get_enum_str(self):
"""
Return the string matching the current enumeration.
Return None on error.
"""
return _bt_ctf_get_enum_str(self._d)
- def get_encoding(self):
- """
- Return the encoding of an int or a string.
- Return a negative value on error.
- """
- return _bt_ctf_get_encoding(_bt_ctf_get_decl_from_def(self._d))
-
- def get_array_len(self):
- """
- Return the len of an array or a negative
- value on error.
- """
- return _bt_ctf_get_array_len(_bt_ctf_get_decl_from_def(self._d))
-
- def get_array_element_at(self, index):
+ def _get_array_element_at(self, index):
"""
Return the array's element at position index.
Return None on error
return None
return _Definition(definition_ptr, self.scope)
- def get_sequence_len(self):
+ def _get_sequence_len(self):
"""
Return the len of a sequence or a negative
value on error.
seq = _bt_python_get_sequence_from_def(self._d)
return _bt_sequence_len(seq)
- def get_sequence_element_at(self, index):
+ def _get_sequence_element_at(self, index):
"""
Return the sequence's element at position index,
otherwise return None
return _Definition(definition_ptr, self.scope)
return None
- def get_uint64(self):
+ def _get_uint64(self):
"""
Return the value associated with the field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_uint64(self._d)
- def get_int64(self):
+ def _get_int64(self):
"""
Return the value associated with the field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_int64(self._d)
- def get_char_array(self):
+ def _get_char_array(self):
"""
Return the value associated with the field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_char_array(self._d)
- def get_str(self):
+ def _get_str(self):
"""
Return the value associated with the field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_string(self._d)
- def get_float(self):
+ def _get_float(self):
"""
Return the value associated with the field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_float(self._d)
- def get_variant(self):
+ def _get_variant(self):
"""
Return the variant's selected field.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_variant(self._d)
- def get_struct_field_count(self):
+ def _get_struct_field_count(self):
"""
Return the number of fields contained in the structure.
If the field does not exist or is not of the type requested,
"""
return _bt_ctf_get_struct_field_count(self._d)
- def get_struct_field_at(self, i):
+ def _get_struct_field_at(self, i):
"""
Return the structure's field at position i.
If the field does not exist or is not of the type requested,
id = self.type
value = None
if id == CTFTypeId.STRING:
- value = self.get_str()
+ value = self._get_str()
elif id == CTFTypeId.ARRAY:
value = []
- for i in range(self.get_array_len()):
- element = self.get_array_element_at(i)
+ for i in range(self.declaration.length):
+ element = self._get_array_element_at(i)
value.append(element.value)
elif id == CTFTypeId.INTEGER:
- if self.get_int_signedness() == 0:
- value = self.get_uint64()
+ if self.declaration.signedness == 0:
+ value = self._get_uint64()
else:
- value = self.get_int64()
+ value = self._get_int64()
elif id == CTFTypeId.ENUM:
- value = self.get_enum_str()
+ value = self._get_enum_str()
elif id == CTFTypeId.SEQUENCE:
- seq_len = self.get_sequence_len()
+ seq_len = self._get_sequence_len()
value = []
for i in range(seq_len):
- evDef = self.get_sequence_element_at(i)
+ evDef = self._get_sequence_element_at(i)
value.append(evDef.value)
elif id == CTFTypeId.FLOAT:
- value = self.get_float()
+ value = self._get_float()
elif id == CTFTypeId.VARIANT:
- variant = Definition.__new__(Definition)
- variant._d = self.get_variant();
+ variant = _Definition.__new__(_Definition)
+ variant._d = self._get_variant();
value = variant.value
elif id == CTFTypeId.STRUCT:
value = {}
- for i in range(self.get_struct_field_count()):
- member = _Definition(self.get_struct_field_at(i), self.scope)
+ for i in range(self._get_struct_field_count()):
+ member = _Definition(self._get_struct_field_at(i), self.scope)
value[member.name] = member.value
if field_error():
- raise FieldError("Error occurred while accessing field {} of type {}".format(self.field_name(), CTFTypeId.get_type_name(self.field_type())))
+ raise FieldError("Error occurred while accessing field {} of type {}".format(self.name, CTFTypeId.type_name(self.declaration.type)))
return value
@property
%pythoncode %{
class CTFWriter:
- class ByteOrder:
- BYTE_ORDER_NATIVE = 0
- BYTE_ORDER_LITTLE_ENDIAN = 1
- BYTE_ORDER_BIG_ENDIAN = 2
- BYTE_ORDER_NETWORK = 3
class Clock:
def __init__(self, name):