docs: cleanup: Rephrase and correct typos
[barectf.git] / barectf / config.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2015-2020 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
12 #
13 # The above copyright notice and this permission notice shall be
14 # included in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
19 # IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
20 # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
21 # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
22 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23
24 import barectf.version as barectf_version
25 from typing import Optional, Any, FrozenSet, Mapping, Iterator, Set, Union, Callable
26 import typing
27 from barectf.typing import Count, Alignment, _OptStr, Id
28 import collections.abc
29 import collections
30 import datetime
31 import enum
32 import uuid as uuidp
33
34
35 @enum.unique
36 class ByteOrder(enum.Enum):
37 LITTLE_ENDIAN = 'le'
38 BIG_ENDIAN = 'be'
39
40
41 class _FieldType:
42 @property
43 def alignment(self) -> Alignment:
44 raise NotImplementedError
45
46 @property
47 def size_is_dynamic(self):
48 return False
49
50
51 class _BitArrayFieldType(_FieldType):
52 def __init__(self, size: Count, alignment: Alignment = Alignment(1)):
53 self._size = size
54 self._alignment = alignment
55
56 @property
57 def size(self) -> Count:
58 return self._size
59
60 @property
61 def alignment(self) -> Alignment:
62 return self._alignment
63
64
65 class DisplayBase(enum.Enum):
66 BINARY = 2
67 OCTAL = 8
68 DECIMAL = 10
69 HEXADECIMAL = 16
70
71
72 class _IntegerFieldType(_BitArrayFieldType):
73 def __init__(self, size: Count, alignment: Optional[Alignment] = None,
74 preferred_display_base: DisplayBase = DisplayBase.DECIMAL):
75 if alignment is None:
76 alignment = Alignment(8 if size % 8 == 0 else 1)
77
78 super().__init__(size, alignment)
79 self._preferred_display_base = preferred_display_base
80
81 @property
82 def preferred_display_base(self) -> DisplayBase:
83 return self._preferred_display_base
84
85
86 class UnsignedIntegerFieldType(_IntegerFieldType):
87 def __init__(self, *args, **kwargs):
88 super().__init__(*args, **kwargs)
89 self._mapped_clk_type_name = None
90 self._is_len = False
91
92
93 class SignedIntegerFieldType(_IntegerFieldType):
94 pass
95
96
97 class EnumerationFieldTypeMappingRange:
98 def __init__(self, lower: int, upper: int):
99 self._lower = lower
100 self._upper = upper
101
102 @property
103 def lower(self) -> int:
104 return self._lower
105
106 @property
107 def upper(self) -> int:
108 return self._upper
109
110 def __eq__(self, other: Any) -> bool:
111 if type(other) is not type(self):
112 return False
113
114 return (self._lower, self._upper) == (other._lower, other._upper)
115
116 def __hash__(self) -> int:
117 return hash((self._lower, self._upper))
118
119 def contains(self, value: int) -> bool:
120 return self._lower <= value <= self._upper
121
122
123 class EnumerationFieldTypeMapping:
124 def __init__(self, ranges: Set[EnumerationFieldTypeMappingRange]):
125 self._ranges = frozenset(ranges)
126
127 @property
128 def ranges(self) -> FrozenSet[EnumerationFieldTypeMappingRange]:
129 return self._ranges
130
131 def ranges_contain_value(self, value: int) -> bool:
132 return any([rg.contains(value) for rg in self._ranges])
133
134
135 _EnumFtMappings = Mapping[str, EnumerationFieldTypeMapping]
136
137
138 class EnumerationFieldTypeMappings(collections.abc.Mapping):
139 def __init__(self, mappings: _EnumFtMappings):
140 self._mappings = {label: mapping for label, mapping in mappings.items()}
141
142 def __getitem__(self, key: str) -> EnumerationFieldTypeMapping:
143 return self._mappings[key]
144
145 def __iter__(self) -> Iterator[str]:
146 return iter(self._mappings)
147
148 def __len__(self) -> int:
149 return len(self._mappings)
150
151
152 class _EnumerationFieldType(_IntegerFieldType):
153 def __init__(self, size: Count, alignment: Optional[Alignment] = None,
154 preferred_display_base: DisplayBase = DisplayBase.DECIMAL,
155 mappings: Optional[_EnumFtMappings] = None):
156 super().__init__(size, alignment, preferred_display_base)
157 self._mappings = EnumerationFieldTypeMappings({})
158
159 if mappings is not None:
160 self._mappings = EnumerationFieldTypeMappings(mappings)
161
162 @property
163 def mappings(self) -> EnumerationFieldTypeMappings:
164 return self._mappings
165
166 def labels_for_value(self, value: int) -> Set[str]:
167 labels = set()
168
169 for label, mapping in self._mappings.items():
170 if mapping.ranges_contain_value(value):
171 labels.add(label)
172
173 return labels
174
175
176 class UnsignedEnumerationFieldType(_EnumerationFieldType, UnsignedIntegerFieldType):
177 pass
178
179
180 class SignedEnumerationFieldType(_EnumerationFieldType, SignedIntegerFieldType):
181 pass
182
183
184 class RealFieldType(_BitArrayFieldType):
185 pass
186
187
188 class StringFieldType(_FieldType):
189 @property
190 def alignment(self) -> Alignment:
191 return Alignment(8)
192
193 @property
194 def size_is_dynamic(self):
195 return True
196
197
198 class _ArrayFieldType(_FieldType):
199 def __init__(self, element_field_type: _FieldType):
200 self._element_field_type = element_field_type
201
202 @property
203 def element_field_type(self) -> _FieldType:
204 return self._element_field_type
205
206 @property
207 def alignment(self) -> Alignment:
208 return self._element_field_type.alignment
209
210 @property
211 def size_is_dynamic(self):
212 return self._element_field_type.size_is_dynamic
213
214
215 class StaticArrayFieldType(_ArrayFieldType):
216 def __init__(self, length: Count, element_field_type: _FieldType):
217 super().__init__(element_field_type)
218 self._length = length
219
220 @property
221 def length(self) -> Count:
222 return self._length
223
224
225 class DynamicArrayFieldType(_ArrayFieldType):
226 def __init__(self, length_field_type: UnsignedIntegerFieldType, element_field_type: _FieldType):
227 super().__init__(element_field_type)
228 self._length_field_type = length_field_type
229 self._length_ft_member_name: Optional[str] = None
230
231 @property
232 def length_field_type(self):
233 return self._length_field_type
234
235 @property
236 def size_is_dynamic(self):
237 return True
238
239
240 class StructureFieldTypeMember:
241 def __init__(self, field_type: _FieldType):
242 self._field_type = field_type
243
244 @property
245 def field_type(self) -> _FieldType:
246 return self._field_type
247
248
249 _StructFtMembers = Mapping[str, StructureFieldTypeMember]
250
251
252 class StructureFieldTypeMembers(collections.abc.Mapping):
253 def __init__(self, members: _StructFtMembers):
254 self._members = collections.OrderedDict()
255
256 for name, member in members.items():
257 assert type(member) is StructureFieldTypeMember
258 self._members[name] = member
259
260 def __getitem__(self, key: str) -> StructureFieldTypeMember:
261 return self._members[key]
262
263 def __iter__(self) -> Iterator[str]:
264 return iter(self._members)
265
266 def __len__(self) -> int:
267 return len(self._members)
268
269
270 class StructureFieldType(_FieldType):
271 def __init__(self, minimum_alignment: Alignment = Alignment(1),
272 members: Optional[_StructFtMembers] = None):
273 self._minimum_alignment = minimum_alignment
274 self._members = StructureFieldTypeMembers({})
275
276 if members is not None:
277 self._members = StructureFieldTypeMembers(members)
278
279 self._set_alignment()
280 self._set_dyn_array_ft_length_ft_member_names()
281
282 def _set_alignment(self):
283 self._alignment: Alignment = self._minimum_alignment
284
285 for member in self._members.values():
286 if member.field_type.alignment > self._alignment:
287 self._alignment = member.field_type.alignment
288
289 def _set_dyn_array_ft_length_ft_member_names(self):
290 for member in self._members.values():
291 if type(member.field_type) is DynamicArrayFieldType:
292 # Find length field type member name within the same
293 # structure field type members.
294 for len_name, len_member in self._members.items():
295 if member.field_type.length_field_type is len_member.field_type:
296 member.field_type._length_ft_member_name = len_name
297 len_member.field_type._is_len = True
298 break
299
300 if member.field_type.alignment > self._alignment:
301 self._alignment = member.field_type.alignment
302
303 @property
304 def minimum_alignment(self) -> Alignment:
305 return self._minimum_alignment
306
307 @property
308 def alignment(self) -> Alignment:
309 return self._alignment
310
311 @property
312 def size_is_dynamic(self):
313 return any([member.field_type.size_is_dynamic for member in self.members.values()])
314
315 @property
316 def members(self) -> StructureFieldTypeMembers:
317 return self._members
318
319
320 class _UniqueByName:
321 _name: str
322
323 def __eq__(self, other: Any) -> bool:
324 if type(other) is not type(self):
325 return False
326
327 return self._name == other._name
328
329 def __lt__(self, other: '_UniqueByName'):
330 assert type(self) is type(other)
331 return self._name < other._name
332
333 def __hash__(self) -> int:
334 return hash(self._name)
335
336
337 _OptFt = Optional[_FieldType]
338 _OptStructFt = Optional[StructureFieldType]
339 LogLevel = typing.NewType('LogLevel', int)
340
341
342 class EventRecordType(_UniqueByName):
343 def __init__(self, name: str, log_level: Optional[LogLevel] = None,
344 specific_context_field_type: _OptStructFt = None, payload_field_type: _OptStructFt = None):
345 self._id: Optional[Id] = None
346 self._name = name
347 self._log_level = log_level
348 self._specific_context_field_type = specific_context_field_type
349 self._payload_field_type = payload_field_type
350
351 @property
352 def id(self) -> Optional[Id]:
353 return self._id
354
355 @property
356 def name(self) -> str:
357 return self._name
358
359 @property
360 def log_level(self) -> Optional[LogLevel]:
361 return self._log_level
362
363 @property
364 def specific_context_field_type(self) -> _OptStructFt:
365 return self._specific_context_field_type
366
367 @property
368 def payload_field_type(self) -> _OptStructFt:
369 return self._payload_field_type
370
371
372 class ClockTypeOffset:
373 def __init__(self, seconds: int = 0, cycles: Count = Count(0)):
374 self._seconds = seconds
375 self._cycles = cycles
376
377 @property
378 def seconds(self) -> int:
379 return self._seconds
380
381 @property
382 def cycles(self) -> Count:
383 return self._cycles
384
385
386 _OptUuid = Optional[uuidp.UUID]
387
388
389 class ClockType(_UniqueByName):
390 def __init__(self, name: str, frequency: Count = Count(int(1e9)), uuid: _OptUuid = None,
391 description: _OptStr = None, precision: Count = Count(0),
392 offset: Optional[ClockTypeOffset] = None, origin_is_unix_epoch: bool = False):
393 self._name = name
394 self._frequency = frequency
395 self._uuid = uuid
396 self._description = description
397 self._precision = precision
398 self._offset = ClockTypeOffset()
399
400 if offset is not None:
401 self._offset = offset
402
403 self._origin_is_unix_epoch = origin_is_unix_epoch
404
405 @property
406 def name(self) -> str:
407 return self._name
408
409 @property
410 def frequency(self) -> Count:
411 return self._frequency
412
413 @property
414 def uuid(self) -> _OptUuid:
415 return self._uuid
416
417 @property
418 def description(self) -> _OptStr:
419 return self._description
420
421 @property
422 def precision(self) -> Count:
423 return self._precision
424
425 @property
426 def offset(self) -> ClockTypeOffset:
427 return self._offset
428
429 @property
430 def origin_is_unix_epoch(self) -> bool:
431 return self._origin_is_unix_epoch
432
433
434 DEFAULT_FIELD_TYPE = 'default'
435 _DefaultableUIntFt = Union[str, UnsignedIntegerFieldType]
436 _OptDefaultableUIntFt = Optional[_DefaultableUIntFt]
437 _OptUIntFt = Optional[UnsignedIntegerFieldType]
438
439
440 class DataStreamTypePacketFeatures:
441 def __init__(self, total_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
442 content_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
443 beginning_timestamp_field_type: _OptDefaultableUIntFt = None,
444 end_timestamp_field_type: _OptDefaultableUIntFt = None,
445 discarded_event_records_snapshot_counter_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
446 sequence_number_field_type: _OptDefaultableUIntFt = None):
447 def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
448 if user_ft == DEFAULT_FIELD_TYPE:
449 return UnsignedIntegerFieldType(64)
450
451 return typing.cast(_OptUIntFt, user_ft)
452
453 self._total_size_field_type = get_ft(total_size_field_type)
454 self._content_size_field_type = get_ft(content_size_field_type)
455 self._beginning_timestamp_field_type = get_ft(beginning_timestamp_field_type)
456 self._end_timestamp_field_type = get_ft(end_timestamp_field_type)
457 self._discarded_event_records_snapshot_counter_field_type = get_ft(discarded_event_records_snapshot_counter_field_type)
458 self._sequence_number_field_type = get_ft(sequence_number_field_type)
459
460 @property
461 def sequence_number_field_type(self) -> _OptUIntFt:
462 return self._sequence_number_field_type
463
464 @property
465 def total_size_field_type(self) -> _OptUIntFt:
466 return self._total_size_field_type
467
468 @property
469 def content_size_field_type(self) -> _OptUIntFt:
470 return self._content_size_field_type
471
472 @property
473 def beginning_timestamp_field_type(self) -> _OptUIntFt:
474 return self._beginning_timestamp_field_type
475
476 @property
477 def end_timestamp_field_type(self) -> _OptUIntFt:
478 return self._end_timestamp_field_type
479
480 @property
481 def discarded_event_records_snapshot_counter_field_type(self) -> _OptUIntFt:
482 return self._discarded_event_records_snapshot_counter_field_type
483
484
485 class DataStreamTypeEventRecordFeatures:
486 def __init__(self, type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
487 timestamp_field_type: _OptDefaultableUIntFt = None):
488 def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
489 if user_ft == DEFAULT_FIELD_TYPE:
490 return UnsignedIntegerFieldType(64)
491
492 return typing.cast(_OptUIntFt, user_ft)
493
494 self._type_id_field_type = get_ft(type_id_field_type)
495 self._timestamp_field_type = get_ft(timestamp_field_type)
496
497 @property
498 def type_id_field_type(self) -> _OptUIntFt:
499 return self._type_id_field_type
500
501 @property
502 def timestamp_field_type(self) -> _OptUIntFt:
503 return self._timestamp_field_type
504
505
506 class DataStreamTypeFeatures:
507 def __init__(self, packet_features: Optional[DataStreamTypePacketFeatures] = None,
508 event_record_features: Optional[DataStreamTypeEventRecordFeatures] = None):
509 if packet_features is None:
510 self._packet_features = DataStreamTypePacketFeatures()
511 else:
512 self._packet_features = packet_features
513
514 if event_record_features is None:
515 self._event_record_features = DataStreamTypeEventRecordFeatures()
516 else:
517 self._event_record_features = event_record_features
518
519 @property
520 def packet_features(self) -> DataStreamTypePacketFeatures:
521 return self._packet_features
522
523 @property
524 def event_record_features(self) -> DataStreamTypeEventRecordFeatures:
525 return self._event_record_features
526
527
528 class DataStreamType(_UniqueByName):
529 def __init__(self, name: str, event_record_types: Set[EventRecordType],
530 default_clock_type: Optional[ClockType] = None,
531 features: Optional[DataStreamTypeFeatures] = None,
532 packet_context_field_type_extra_members: Optional[_StructFtMembers] = None,
533 event_record_common_context_field_type: _OptStructFt = None):
534 self._id: Optional[Id] = None
535 self._name = name
536 self._default_clock_type = default_clock_type
537 self._event_record_common_context_field_type = event_record_common_context_field_type
538 self._event_record_types = frozenset(event_record_types)
539
540 # assign unique IDs
541 for index, ert in enumerate(sorted(self._event_record_types, key=lambda evt: evt.name)):
542 assert ert._id is None
543 ert._id = Id(index)
544
545 self._set_features(features)
546 self._packet_context_field_type_extra_members = StructureFieldTypeMembers({})
547
548 if packet_context_field_type_extra_members is not None:
549 self._packet_context_field_type_extra_members = StructureFieldTypeMembers(packet_context_field_type_extra_members)
550
551 self._set_pkt_ctx_ft()
552 self._set_er_header_ft()
553
554 def _set_features(self, features: Optional[DataStreamTypeFeatures]):
555 if features is not None:
556 self._features = features
557 return None
558
559 er_ts_ft = None
560 pkt_beginning_ts_ft = None
561 pkt_end_ts_ft = None
562
563 if self._default_clock_type is not None:
564 # Automatic timestamp field types because the data stream
565 # type has a default clock type.
566 er_ts_ft = DEFAULT_FIELD_TYPE
567 pkt_beginning_ts_ft = DEFAULT_FIELD_TYPE
568 pkt_end_ts_ft = DEFAULT_FIELD_TYPE
569
570 self._features = DataStreamTypeFeatures(DataStreamTypePacketFeatures(beginning_timestamp_field_type=pkt_beginning_ts_ft,
571 end_timestamp_field_type=pkt_end_ts_ft),
572 DataStreamTypeEventRecordFeatures(timestamp_field_type=er_ts_ft))
573
574 def _set_ft_mapped_clk_type_name(self, ft: Optional[UnsignedIntegerFieldType]):
575 if ft is None:
576 return
577
578 if self._default_clock_type is not None:
579 assert isinstance(ft, UnsignedIntegerFieldType)
580 ft._mapped_clk_type_name = self._default_clock_type.name
581
582 def _set_pkt_ctx_ft(self):
583 members = None
584
585 def add_member_if_exists(name: str, ft: _FieldType, set_mapped_clk_type_name: bool = False):
586 nonlocal members
587
588 if ft is not None:
589 if set_mapped_clk_type_name:
590 self._set_ft_mapped_clk_type_name(typing.cast(UnsignedIntegerFieldType, ft))
591
592 members[name] = StructureFieldTypeMember(ft)
593
594 members = collections.OrderedDict([
595 (
596 'packet_size',
597 StructureFieldTypeMember(self._features.packet_features.total_size_field_type)
598 ),
599 (
600 'content_size',
601 StructureFieldTypeMember(self._features.packet_features.content_size_field_type)
602 )
603 ])
604
605 add_member_if_exists('timestamp_begin',
606 self._features.packet_features.beginning_timestamp_field_type, True)
607 add_member_if_exists('timestamp_end', self._features.packet_features.end_timestamp_field_type,
608 True)
609 add_member_if_exists('events_discarded',
610 self._features.packet_features.discarded_event_records_snapshot_counter_field_type)
611 add_member_if_exists('packet_seq_num',
612 self._features.packet_features.sequence_number_field_type)
613
614 if self._packet_context_field_type_extra_members is not None:
615 for name, field_type in self._packet_context_field_type_extra_members.items():
616 assert name not in members
617 members[name] = field_type
618
619 self._pkt_ctx_ft = StructureFieldType(8, members)
620
621 def _set_er_header_ft(self):
622 members = collections.OrderedDict()
623
624 if self._features.event_record_features.type_id_field_type is not None:
625 members['id'] = StructureFieldTypeMember(self._features.event_record_features.type_id_field_type)
626
627 if self._features.event_record_features.timestamp_field_type is not None:
628 ft = self._features.event_record_features.timestamp_field_type
629 self._set_ft_mapped_clk_type_name(ft)
630 members['timestamp'] = StructureFieldTypeMember(ft)
631
632 self._er_header_ft = StructureFieldType(8, members)
633
634 @property
635 def id(self) -> Optional[Id]:
636 return self._id
637
638 @property
639 def name(self) -> str:
640 return self._name
641
642 @property
643 def default_clock_type(self) -> Optional[ClockType]:
644 return self._default_clock_type
645
646 @property
647 def features(self) -> DataStreamTypeFeatures:
648 return self._features
649
650 @property
651 def packet_context_field_type_extra_members(self) -> StructureFieldTypeMembers:
652 return self._packet_context_field_type_extra_members
653
654 @property
655 def event_record_common_context_field_type(self) -> _OptStructFt:
656 return self._event_record_common_context_field_type
657
658 @property
659 def event_record_types(self) -> FrozenSet[EventRecordType]:
660 return self._event_record_types
661
662
663 _OptUuidFt = Optional[Union[str, StaticArrayFieldType]]
664
665
666 class TraceTypeFeatures:
667 def __init__(self, magic_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
668 uuid_field_type: _OptUuidFt = None,
669 data_stream_type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE):
670 def get_field_type(user_ft: Optional[Union[str, _FieldType]],
671 create_default_ft: Callable[[], _FieldType]) -> _OptFt:
672 if user_ft == DEFAULT_FIELD_TYPE:
673 return create_default_ft()
674
675 return typing.cast(_OptFt, user_ft)
676
677 def create_default_magic_ft():
678 return UnsignedIntegerFieldType(32)
679
680 def create_default_uuid_ft():
681 return StaticArrayFieldType(Count(16), UnsignedIntegerFieldType(8))
682
683 def create_default_dst_id_ft():
684 return UnsignedIntegerFieldType(64)
685
686 self._magic_field_type = typing.cast(_OptUIntFt, get_field_type(magic_field_type, create_default_magic_ft))
687 self._uuid_field_type = typing.cast(Optional[StaticArrayFieldType],
688 get_field_type(uuid_field_type, create_default_uuid_ft))
689 self._data_stream_type_id_field_type = typing.cast(_OptUIntFt,
690 get_field_type(data_stream_type_id_field_type,
691 create_default_dst_id_ft))
692
693 @property
694 def magic_field_type(self) -> _OptUIntFt:
695 return self._magic_field_type
696
697 @property
698 def uuid_field_type(self) -> Optional[StaticArrayFieldType]:
699 return self._uuid_field_type
700
701 @property
702 def data_stream_type_id_field_type(self) -> _OptUIntFt:
703 return self._data_stream_type_id_field_type
704
705
706 class _TraceType:
707 def __init__(self, trace_byte_order: ByteOrder, data_stream_types: Set[DataStreamType],
708 uuid: _OptUuid, features: Optional[TraceTypeFeatures]):
709 self._trace_byte_order = trace_byte_order
710 self._data_stream_types = frozenset(data_stream_types)
711
712 # assign unique IDs
713 for index, dst in enumerate(sorted(self._data_stream_types, key=lambda st: st.name)):
714 assert dst._id is None
715 dst._id = Id(index)
716
717 self._uuid = uuid
718 self._set_features(features)
719 self._set_pkt_header_ft()
720
721 def _set_features(self, features: Optional[TraceTypeFeatures]):
722 if features is not None:
723 self._features = features
724 return
725
726 # automatic UUID field type because the trace type has a UUID
727 uuid_ft = None if self._uuid is None else DEFAULT_FIELD_TYPE
728 self._features = TraceTypeFeatures(uuid_field_type=uuid_ft)
729
730 def _set_pkt_header_ft(self):
731 members = collections.OrderedDict()
732
733 def add_member_if_exists(name: str, ft: _OptFt):
734 nonlocal members
735
736 if ft is not None:
737 members[name] = StructureFieldTypeMember(ft)
738
739 add_member_if_exists('magic', self._features.magic_field_type)
740 add_member_if_exists('uuid', self._features.uuid_field_type)
741 add_member_if_exists('stream_id', self._features.data_stream_type_id_field_type)
742 self._pkt_header_ft = StructureFieldType(8, members)
743
744 @property
745 def trace_byte_order(self) -> ByteOrder:
746 return self._trace_byte_order
747
748 @property
749 def uuid(self) -> _OptUuid:
750 return self._uuid
751
752 @property
753 def data_stream_types(self) -> FrozenSet[DataStreamType]:
754 return self._data_stream_types
755
756 def data_stream_type(self, name: str) -> Optional[DataStreamType]:
757 for cand_dst in self._data_stream_types:
758 if cand_dst.name == name:
759 return cand_dst
760
761 return None
762
763 @property
764 def features(self) -> TraceTypeFeatures:
765 return self._features
766
767 @property
768 def clock_types(self) -> Set[ClockType]:
769 clk_types = set()
770
771 for dst in self._data_stream_types:
772 if dst.default_clock_type is not None:
773 clk_types.add(dst.default_clock_type)
774
775 return clk_types
776
777
778 # Standard trace type class for a barectf 3 configuration.
779 #
780 # The trace byte order property of an instance is the same as the
781 # expected native byte order of the target system.
782 class TraceType(_TraceType):
783 def __init__(self, trace_byte_order: ByteOrder, data_stream_types: Set[DataStreamType],
784 uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None):
785 super().__init__(trace_byte_order, data_stream_types, uuid, features)
786
787 @property
788 def native_byte_order(self) -> ByteOrder:
789 return self._trace_byte_order
790
791
792 # This trace type class specifically exists to support barectf 2
793 # configurations. Such configurations define the actual trace byte order
794 # instead of the expected native byte order of the target system.
795 #
796 # The user provides the trace byte order property of an instance.
797 #
798 # IMPORTANT: When possible, prefer the `TraceType` class instead, as
799 # enforcing the trace byte order could result in a less efficient
800 # generated tracer.
801 class TraceTypeWithUnknownNativeByteOrder(_TraceType):
802 def __init__(self, trace_byte_order: ByteOrder, data_stream_types: Set[DataStreamType],
803 uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None):
804 super().__init__(trace_byte_order, data_stream_types, uuid, features)
805
806
807 _EnvEntry = Union[str, int]
808 _EnvEntries = Mapping[str, _EnvEntry]
809
810
811 class TraceEnvironment(collections.abc.Mapping):
812 def __init__(self, environment: _EnvEntries):
813 self._env = {name: value for name, value in environment.items()}
814
815 def __getitem__(self, key: str) -> _EnvEntry:
816 return self._env[key]
817
818 def __iter__(self) -> Iterator[str]:
819 return iter(self._env)
820
821 def __len__(self) -> int:
822 return len(self._env)
823
824
825 class Trace:
826 def __init__(self, type: _TraceType, environment: Optional[_EnvEntries] = None):
827 self._type = type
828 self._set_env(environment)
829
830 def _set_env(self, environment: Optional[_EnvEntries]):
831 init_env = collections.OrderedDict([
832 ('domain', 'bare'),
833 ('tracer_name', 'barectf'),
834 ('tracer_major', barectf_version.__major_version__),
835 ('tracer_minor', barectf_version.__minor_version__),
836 ('tracer_patch', barectf_version.__patch_version__),
837 ('tracer_pre', barectf_version.__pre_version__),
838 ('barectf_gen_date', str(datetime.datetime.now().isoformat())),
839 ])
840
841 if environment is None:
842 environment = {}
843
844 init_env.update(environment)
845 self._env = TraceEnvironment(typing.cast(_EnvEntries, init_env))
846
847 @property
848 def type(self) -> _TraceType:
849 return self._type
850
851 @property
852 def environment(self) -> TraceEnvironment:
853 return self._env
854
855
856 _ClkTypeCTypes = Mapping[ClockType, str]
857
858
859 class ClockTypeCTypes(collections.abc.Mapping):
860 def __init__(self, c_types: _ClkTypeCTypes):
861 self._c_types = {clk_type: c_type for clk_type, c_type in c_types.items()}
862
863 def __getitem__(self, key: ClockType) -> str:
864 return self._c_types[key]
865
866 def __iter__(self) -> Iterator[ClockType]:
867 return iter(self._c_types)
868
869 def __len__(self) -> int:
870 return len(self._c_types)
871
872
873 class ConfigurationCodeGenerationHeaderOptions:
874 def __init__(self, identifier_prefix_definition: bool = False,
875 default_data_stream_type_name_definition: bool = False):
876 self._identifier_prefix_definition = identifier_prefix_definition
877 self._default_data_stream_type_name_definition = default_data_stream_type_name_definition
878
879 @property
880 def identifier_prefix_definition(self) -> bool:
881 return self._identifier_prefix_definition
882
883 @property
884 def default_data_stream_type_name_definition(self) -> bool:
885 return self._default_data_stream_type_name_definition
886
887
888 class ConfigurationCodeGenerationOptions:
889 def __init__(self, identifier_prefix: str = 'barectf_', file_name_prefix: str = 'barectf',
890 default_data_stream_type: Optional[DataStreamType] = None,
891 header_options: Optional[ConfigurationCodeGenerationHeaderOptions] = None,
892 clock_type_c_types: Optional[_ClkTypeCTypes] = None):
893 self._identifier_prefix = identifier_prefix
894 self._file_name_prefix = file_name_prefix
895 self._default_data_stream_type = default_data_stream_type
896
897 self._header_options = ConfigurationCodeGenerationHeaderOptions()
898
899 if header_options is not None:
900 self._header_options = header_options
901
902 self._clock_type_c_types = ClockTypeCTypes({})
903
904 if clock_type_c_types is not None:
905 self._clock_type_c_types = ClockTypeCTypes(clock_type_c_types)
906
907 @property
908 def identifier_prefix(self) -> str:
909 return self._identifier_prefix
910
911 @property
912 def file_name_prefix(self) -> str:
913 return self._file_name_prefix
914
915 @property
916 def default_data_stream_type(self) -> Optional[DataStreamType]:
917 return self._default_data_stream_type
918
919 @property
920 def header_options(self) -> ConfigurationCodeGenerationHeaderOptions:
921 return self._header_options
922
923 @property
924 def clock_type_c_types(self) -> ClockTypeCTypes:
925 return self._clock_type_c_types
926
927
928 class ConfigurationOptions:
929 def __init__(self,
930 code_generation_options: Optional[ConfigurationCodeGenerationOptions] = None):
931 self._code_generation_options = ConfigurationCodeGenerationOptions()
932
933 if code_generation_options is not None:
934 self._code_generation_options = code_generation_options
935
936 @property
937 def code_generation_options(self) -> ConfigurationCodeGenerationOptions:
938 return self._code_generation_options
939
940
941 class Configuration:
942 def __init__(self, trace: Trace, options: Optional[ConfigurationOptions] = None):
943 self._trace = trace
944 self._options = ConfigurationOptions()
945
946 if options is not None:
947 self._options = options
948
949 clk_type_c_types = self._options.code_generation_options.clock_type_c_types
950
951 for dst in trace.type.data_stream_types:
952 def_clk_type = dst.default_clock_type
953
954 if def_clk_type is None:
955 continue
956
957 if def_clk_type not in clk_type_c_types:
958 clk_type_c_types._c_types[def_clk_type] = 'uint32_t'
959
960 @property
961 def trace(self) -> Trace:
962 return self._trace
963
964 @property
965 def options(self) -> ConfigurationOptions:
966 return self._options
This page took 0.047886 seconds and 4 git commands to generate.