From 2d55dc7d9471bef93c1e896d7c2a6b9ba0c4851a Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Tue, 11 Aug 2020 14:33:35 -0400 Subject: [PATCH] Add Python type hints This patch adds Python type hints to all the modules except `codegen.py`, `gen.py`, and `tsdl182gen.py`, as it is likely that those will change significantly in the future. Mypy 0.782 reports no errors with this patch. The few errors that were found during the type hint introduction process are fixed as part of this patch. `typing.py` is a new module which contains public and private type aliases, mostly derivatives of `int` to add semantics (index, count, version number, and the rest). The ones that are public are available from the `barectf` package itself (`__init__.py`). A `barectf` API user doesn't need to use them without static type checking needs. If she wants to, then she must use `barectf` types explicitly, for example: import barectf clk_type = barectf.ClockType('my_clock', frequency=barectf.Count(100000)) Signed-off-by: Philippe Proulx --- barectf/__init__.py | 11 + barectf/argpar.py | 86 ++++---- barectf/cli.py | 90 ++++---- barectf/config.py | 375 +++++++++++++++++++-------------- barectf/config_file.py | 17 +- barectf/config_parse.py | 43 ++-- barectf/config_parse_common.py | 132 +++++++----- barectf/config_parse_v2.py | 103 +++++---- barectf/config_parse_v3.py | 212 +++++++++++-------- barectf/typing.py | 31 +++ 10 files changed, 663 insertions(+), 437 deletions(-) create mode 100644 barectf/typing.py diff --git a/barectf/__init__.py b/barectf/__init__.py index 0e2cfad..0dcfe8f 100644 --- a/barectf/__init__.py +++ b/barectf/__init__.py @@ -26,6 +26,7 @@ import barectf.version as barectf_version import barectf.config as barectf_config import barectf.config_file as barectf_config_file import barectf.gen as barectf_gen +import barectf.typing as barectf_typing # version API @@ -35,6 +36,14 @@ __patch_version__ = barectf_version.__patch_version__ __version__ = barectf_version.__version__ +# common typing API +Index = barectf_typing.Index +Count = barectf_typing.Count +Id = barectf_typing.Id +Alignment = barectf_typing.Alignment +VersionNumber = barectf_typing.VersionNumber + + # configuration API _ArrayFieldType = barectf_config._ArrayFieldType _BitArrayFieldType = barectf_config._BitArrayFieldType @@ -56,6 +65,7 @@ EnumerationFieldTypeMapping = barectf_config.EnumerationFieldTypeMapping EnumerationFieldTypeMappingRange = barectf_config.EnumerationFieldTypeMappingRange EnumerationFieldTypeMappings = barectf_config.EnumerationFieldTypeMappings EventType = barectf_config.EventType +LogLevel = barectf_config.LogLevel RealFieldType = barectf_config.RealFieldType SignedEnumerationFieldType = barectf_config.SignedEnumerationFieldType SignedIntegerFieldType = barectf_config.SignedIntegerFieldType @@ -92,3 +102,4 @@ del barectf_version del barectf_config del barectf_config_file del barectf_gen +del barectf_typing diff --git a/barectf/argpar.py b/barectf/argpar.py index c9fba5e..8bc7a05 100644 --- a/barectf/argpar.py +++ b/barectf/argpar.py @@ -22,10 +22,16 @@ # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import re -import collections +import typing +from typing import Optional, List, Iterable +from barectf.typing import Index, _OptStr -__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse'] +__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse', 'OrigArgs'] + + +# types +OrigArgs = List[str] # Option descriptor. @@ -36,22 +42,23 @@ class OptDescr: # # If `has_arg` is `True`, then it is expected that such an option # has an argument. - def __init__(self, short_name=None, long_name=None, has_arg=False): + def __init__(self, short_name: _OptStr = None, long_name: _OptStr = None, + has_arg: bool = False): assert short_name is not None or long_name is not None self._short_name = short_name self._long_name = long_name self._has_arg = has_arg @property - def short_name(self): + def short_name(self) -> _OptStr: return self._short_name @property - def long_name(self): + def long_name(self) -> _OptStr: return self._long_name @property - def has_arg(self): + def has_arg(self) -> Optional[bool]: return self._has_arg @@ -61,82 +68,85 @@ class _Item: # Parsed option argument item. class _OptItem(_Item): - def __init__(self, descr, arg_text=None): + def __init__(self, descr: OptDescr, arg_text: _OptStr = None): self._descr = descr self._arg_text = arg_text @property - def descr(self): + def descr(self) -> OptDescr: return self._descr @property - def arg_text(self): + def arg_text(self) -> _OptStr: return self._arg_text # Parsed non-option argument item. class _NonOptItem(_Item): - def __init__(self, text, orig_arg_index, non_opt_index): + def __init__(self, text: str, orig_arg_index: Index, non_opt_index: Index): self._text = text self._orig_arg_index = orig_arg_index self._non_opt_index = non_opt_index @property - def text(self): + def text(self) -> str: return self._text @property - def orig_arg_index(self): + def orig_arg_index(self) -> Index: return self._orig_arg_index @property - def non_opt_index(self): + def non_opt_index(self) -> Index: return self._non_opt_index # Results of parse(). class _ParseRes: - def __init__(self, items, ingested_orig_args, remaining_orig_args): + def __init__(self, items: List[_Item], ingested_orig_args: OrigArgs, + remaining_orig_args: OrigArgs): self._items = items self._ingested_orig_args = ingested_orig_args self._remaining_orig_args = remaining_orig_args @property - def items(self): + def items(self) -> List[_Item]: return self._items @property - def ingested_orig_args(self): + def ingested_orig_args(self) -> OrigArgs: return self._ingested_orig_args @property - def remaining_orig_args(self): + def remaining_orig_args(self) -> OrigArgs: return self._remaining_orig_args # Parsing error. class _Error(Exception): - def __init__(self, orig_arg_index, orig_arg, msg): + def __init__(self, orig_arg_index: Index, orig_arg: str, msg: str): super().__init__(msg) self._orig_arg_index = orig_arg_index self._orig_arg = orig_arg self._msg = msg @property - def orig_arg_index(self): + def orig_arg_index(self) -> Index: return self._orig_arg_index @property - def orig_arg(self): + def orig_arg(self) -> str: return self._orig_arg @property - def msg(self): + def msg(self) -> str: return self._msg # Results of parse_short_opts() and parse_long_opt(); internal. -_OptParseRes = collections.namedtuple('_OptParseRes', ['items', 'orig_arg_index_incr']) +class _OptParseRes(typing.NamedTuple): + items: List[_Item] + orig_arg_index_incr: int # Parses the original arguments `orig_args` (list of strings), @@ -227,11 +237,13 @@ _OptParseRes = collections.namedtuple('_OptParseRes', ['items', 'orig_arg_index_ # resulting option items. # # On failure, this function raises an `_Error` object. -def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): +def parse(orig_args: OrigArgs, opt_descrs: Iterable[OptDescr], + fail_on_unknown_opt: bool = True) -> _ParseRes: # Finds and returns an option description amongst `opt_descrs` # having the short option name `short_name` OR the long option name # `long_name` (not both). - def find_opt_descr(short_name=None, long_name=None): + def find_opt_descr(short_name: _OptStr = None, + long_name: _OptStr = None) -> Optional[OptDescr]: for opt_descr in opt_descrs: if short_name is not None and short_name == opt_descr.short_name: return opt_descr @@ -239,6 +251,8 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): if long_name is not None and long_name == opt_descr.long_name: return opt_descr + return None + # Parses a short option original argument, returning an # `_OptParseRes` object. # @@ -261,9 +275,9 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): # If any of the short options of `orig_arg` is unknown, then this # function raises an error if `fail_on_unknown_opt` is `True`, or # returns `None` otherwise. - def parse_short_opts(): + def parse_short_opts() -> Optional[_OptParseRes]: short_opts = orig_arg[1:] - items = [] + items: List[_Item] = [] done = False index = 0 orig_arg_index_incr = 1 @@ -278,7 +292,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): raise _Error(orig_arg_index, orig_arg, f'Unknown short option `-{short_opt}`') # discard collected arguments - return + return None opt_arg = None @@ -324,7 +338,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): # # If the long option is unknown, then this function raises an error # if `fail_on_unknown_opt` is `True`, or returns `None` otherwise. - def parse_long_opt(): + def parse_long_opt() -> Optional[_OptParseRes]: long_opt = orig_arg[2:] m = re.match(r'--([^=]+)=(.*)', orig_arg) @@ -340,7 +354,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): raise _Error(orig_arg_index, orig_arg, f'Unknown long option `--{long_opt}`') # discard - return + return None orig_arg_index_incr = 1 @@ -361,9 +375,9 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): return _OptParseRes([item], orig_arg_index_incr) # parse original arguments - items = [] - orig_arg_index = 0 - non_opt_index = 0 + items: List[_Item] = [] + orig_arg_index = Index(0) + non_opt_index = Index(0) while orig_arg_index < len(orig_args): orig_arg = orig_args[orig_arg_index] @@ -378,7 +392,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): # option if orig_arg[1] == '-': if orig_arg == '--': - raise _Error(orig_arg_index, 'Invalid `--` argument') + raise _Error(orig_arg_index, orig_arg, 'Invalid `--` argument') # long option res = parse_long_opt() @@ -392,11 +406,11 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True): return _ParseRes(items, orig_args[:orig_arg_index], orig_args[orig_arg_index:]) items += res.items - orig_arg_index += res.orig_arg_index_incr + orig_arg_index = Index(orig_arg_index + res.orig_arg_index_incr) else: # non-option items.append(_NonOptItem(orig_arg, orig_arg_index, non_opt_index)) - non_opt_index += 1 - orig_arg_index += 1 + non_opt_index = Index(non_opt_index + 1) + orig_arg_index = Index(orig_arg_index + 1) return _ParseRes(items, orig_args, []) diff --git a/barectf/cli.py b/barectf/cli.py index 3b21ec4..f832987 100644 --- a/barectf/cli.py +++ b/barectf/cli.py @@ -29,13 +29,16 @@ import os.path import barectf import barectf.config_parse_common as barectf_config_parse_common import barectf.argpar as barectf_argpar +from typing import Any, List, Iterable, NoReturn +import typing +from barectf.typing import Index, Count import sys import os # Colors and prints the error message `msg` and exits with status code # 1. -def _print_error(msg): +def _print_error(msg: str) -> NoReturn: termcolor.cprint('Error: ', 'red', end='', file=sys.stderr) termcolor.cprint(msg, 'red', attrs=['bold'], file=sys.stderr) sys.exit(1) @@ -43,7 +46,7 @@ def _print_error(msg): # Pretty-prints the barectf configuration error `exc` and exits with # status code 1. -def _print_config_error(exc): +def _print_config_error(exc: barectf._ConfigurationParseError) -> NoReturn: # reverse: most precise message comes last for ctx in reversed(exc.context): msg = '' @@ -60,7 +63,7 @@ def _print_config_error(exc): # Pretty-prints the unknown exception `exc`. -def _print_unknown_exc(exc): +def _print_unknown_exc(exc: Exception) -> NoReturn: import traceback traceback.print_exc() @@ -69,12 +72,16 @@ def _print_unknown_exc(exc): # Finds and returns all the option items in `items` having the long name # `long_name`. -def _find_opt_items(items, long_name): - ret_items = [] +def _find_opt_items(items: Iterable[barectf_argpar._Item], + long_name: str) -> List[barectf_argpar._OptItem]: + ret_items: List[barectf_argpar._OptItem] = [] for item in items: - if type(item) is barectf_argpar._OptItem and item.descr.long_name == long_name: - ret_items.append(item) + if type(item) is barectf_argpar._OptItem: + item = typing.cast(barectf_argpar._OptItem, item) + + if item.descr.long_name == long_name: + ret_items.append(item) return ret_items @@ -91,7 +98,8 @@ def _find_opt_items(items, long_name): # `items`. # # Returns `default` if there's no such option item. -def _opt_item_val(items, long_name, default=None): +def _opt_item_val(items: Iterable[barectf_argpar._Item], long_name: str, + default: Any = None) -> Any: opt_items = _find_opt_items(items, long_name) if len(opt_items) == 0: @@ -109,7 +117,7 @@ class _CliError(Exception): pass -def _cfg_file_path_from_parse_res(parse_res): +def _cfg_file_path_from_parse_res(parse_res: barectf_argpar._ParseRes) -> str: cfg_file_path = None for item in parse_res.items: @@ -117,7 +125,7 @@ def _cfg_file_path_from_parse_res(parse_res): if cfg_file_path is not None: raise _CliError('Multiple configuration file paths provided') - cfg_file_path = item.text + cfg_file_path = typing.cast(barectf_argpar._NonOptItem, item).text if cfg_file_path is None: raise _CliError('Missing configuration file path') @@ -130,12 +138,13 @@ def _cfg_file_path_from_parse_res(parse_res): # Returns a `_CfgCmdCfg` object from the command-line parsing results # `parse_res`. -def _cfg_cmd_cfg_from_parse_res(parse_res): +def _cfg_cmd_cfg_from_parse_res(parse_res: barectf_argpar._ParseRes) -> '_CfgCmdCfg': # check configuration file path cfg_file_path = _cfg_file_path_from_parse_res(parse_res) - # inclusion directories - inclusion_dirs = [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')] + # inclusion directories (`--include-dir` option needs an argument) + inclusion_dirs = typing.cast(List[str], + [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')]) for dir in inclusion_dirs: if not os.path.isdir(dir): @@ -175,7 +184,7 @@ Options: # Returns a source and metadata stream file generating command object # from the specific command-line arguments `orig_args`. -def _gen_cmd_cfg_from_args(orig_args): +def _gen_cmd_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_GenCmd': # parse original arguments opt_descrs = [ barectf_argpar.OptDescr('h', 'help'), @@ -237,7 +246,7 @@ Options: # Returns an effective configuration showing command object from the # specific command-line arguments `orig_args`. -def _show_effective_cfg_cfg_from_args(orig_args): +def _show_effective_cfg_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowEffectiveCfgCmd': # parse original arguments opt_descrs = [ barectf_argpar.OptDescr('h', 'help'), @@ -270,7 +279,7 @@ def _show_effective_cfg_cfg_from_args(orig_args): return _ShowEffectiveCfgCmd(_ShowEffectiveCfgCmdCfg(cfg_cmd_cfg.cfg_file_path, cfg_cmd_cfg.inclusion_dirs, cfg_cmd_cfg.ignore_inclusion_file_not_found, - indent_space_count)) + Count(indent_space_count))) def _show_cfg_version_cmd_usage(): @@ -285,7 +294,7 @@ Options: # Returns a configuration version showing command object from the # specific command-line arguments `orig_args`. -def _show_cfg_version_cfg_from_args(orig_args): +def _show_cfg_version_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowCfgVersionCmd': # parse original arguments opt_descrs = [ barectf_argpar.OptDescr('h', 'help'), @@ -336,7 +345,7 @@ Run `barectf COMMAND --help` to show the help of COMMAND.''') # Returns a command object from the command-line arguments `orig_args`. # # All the `orig_args` elements are considered. -def _cmd_from_args(orig_args): +def _cmd_from_args(orig_args: barectf_argpar.OrigArgs) -> '_Cmd': # We use our `argpar` module here instead of Python's `argparse` # because we need to support the two following use cases: # @@ -363,23 +372,24 @@ def _cmd_from_args(orig_args): 'show-config-version': _show_cfg_version_cfg_from_args, 'show-cfg-version': _show_cfg_version_cfg_from_args, } - general_opt_items = [] + general_opt_items: List[barectf_argpar._OptItem] = [] cmd_first_orig_arg_index = None cmd_from_args_func = None for item in res.items: if type(item) is barectf_argpar._NonOptItem: + item = typing.cast(barectf_argpar._NonOptItem, item) cmd_from_args_func = cmd_from_args_funcs.get(item.text) if cmd_from_args_func is None: cmd_first_orig_arg_index = item.orig_arg_index else: - cmd_first_orig_arg_index = item.orig_arg_index + 1 + cmd_first_orig_arg_index = Index(item.orig_arg_index + 1) break else: assert type(item) is barectf_argpar._OptItem - general_opt_items.append(item) + general_opt_items.append(typing.cast(barectf_argpar._OptItem, item)) # general help? if len(_find_opt_items(general_opt_items, 'help')) > 0: @@ -406,30 +416,31 @@ class _CmdCfg: class _CfgCmdCfg(_CmdCfg): - def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found): + def __init__(self, cfg_file_path: str, inclusion_dirs: List[str], + ignore_inclusion_file_not_found: bool): self._cfg_file_path = cfg_file_path self._inclusion_dirs = inclusion_dirs self._ignore_inclusion_file_not_found = ignore_inclusion_file_not_found @property - def cfg_file_path(self): + def cfg_file_path(self) -> str: return self._cfg_file_path @property - def inclusion_dirs(self): + def inclusion_dirs(self) -> List[str]: return self._inclusion_dirs @property - def ignore_inclusion_file_not_found(self): + def ignore_inclusion_file_not_found(self) -> bool: return self._ignore_inclusion_file_not_found class _Cmd: - def __init__(self, cfg): + def __init__(self, cfg: _CmdCfg): self._cfg = cfg @property - def cfg(self): + def cfg(self) -> _CmdCfg: return self._cfg def exec(self): @@ -437,8 +448,9 @@ class _Cmd: class _GenCmdCfg(_CfgCmdCfg): - def __init__(self, cfg_file_path, c_source_dir, c_header_dir, metadata_stream_dir, - inclusion_dirs, ignore_inclusion_file_not_found, dump_config, v2_prefix): + def __init__(self, cfg_file_path: str, c_source_dir: str, c_header_dir: str, + metadata_stream_dir: str, inclusion_dirs: List[str], + ignore_inclusion_file_not_found: bool, dump_config: bool, v2_prefix: str): super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found) self._c_source_dir = c_source_dir self._c_header_dir = c_header_dir @@ -447,23 +459,23 @@ class _GenCmdCfg(_CfgCmdCfg): self._v2_prefix = v2_prefix @property - def c_source_dir(self): + def c_source_dir(self) -> str: return self._c_source_dir @property - def c_header_dir(self): + def c_header_dir(self) -> str: return self._c_header_dir @property - def metadata_stream_dir(self): + def metadata_stream_dir(self) -> str: return self._metadata_stream_dir @property - def dump_config(self): + def dump_config(self) -> bool: return self._dump_config @property - def v2_prefix(self): + def v2_prefix(self) -> str: return self._v2_prefix @@ -531,13 +543,13 @@ class _GenCmd(_Cmd): class _ShowEffectiveCfgCmdCfg(_CfgCmdCfg): - def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found, - indent_space_count): + def __init__(self, cfg_file_path: str, inclusion_dirs: List[str], + ignore_inclusion_file_not_found: bool, indent_space_count: Count): super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found) self._indent_space_count = indent_space_count @property - def indent_space_count(self): + def indent_space_count(self) -> Count: return self._indent_space_count @@ -556,11 +568,11 @@ class _ShowEffectiveCfgCmd(_Cmd): class _ShowCfgVersionCmdCfg(_CmdCfg): - def __init__(self, cfg_file_path): + def __init__(self, cfg_file_path: str): self._cfg_file_path = cfg_file_path @property - def cfg_file_path(self): + def cfg_file_path(self) -> str: return self._cfg_file_path diff --git a/barectf/config.py b/barectf/config.py index 319d28c..edf1bfe 100644 --- a/barectf/config.py +++ b/barectf/config.py @@ -22,10 +22,14 @@ # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import barectf.version as barectf_version +from typing import Optional, Any, FrozenSet, Mapping, Iterator, Set, Union +import typing +from barectf.typing import Count, Alignment, _OptStr, Id import collections.abc import collections import datetime import enum +import uuid as uuidp @enum.unique @@ -36,26 +40,27 @@ class ByteOrder(enum.Enum): class _FieldType: @property - def alignment(self): + def alignment(self) -> Alignment: raise NotImplementedError class _BitArrayFieldType(_FieldType): - def __init__(self, size, byte_order=None, alignment=1): + def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None, + alignment: Alignment = Alignment(1)): self._size = size self._byte_order = byte_order self._alignment = alignment @property - def size(self): + def size(self) -> Count: return self._size @property - def byte_order(self): + def byte_order(self) -> Optional[ByteOrder]: return self._byte_order @property - def alignment(self): + def alignment(self) -> Alignment: return self._alignment @@ -67,18 +72,19 @@ class DisplayBase(enum.Enum): class _IntegerFieldType(_BitArrayFieldType): - def __init__(self, size, byte_order=None, alignment=None, - preferred_display_base=DisplayBase.DECIMAL): + def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None, + alignment: Optional[Alignment] = None, + preferred_display_base: DisplayBase = DisplayBase.DECIMAL): effective_alignment = 1 if alignment is None and size % 8 == 0: effective_alignment = 8 - super().__init__(size, byte_order, effective_alignment) + super().__init__(size, byte_order, Alignment(effective_alignment)) self._preferred_display_base = preferred_display_base @property - def preferred_display_base(self): + def preferred_display_base(self) -> DisplayBase: return self._preferred_display_base @@ -93,60 +99,65 @@ class SignedIntegerFieldType(_IntegerFieldType): class EnumerationFieldTypeMappingRange: - def __init__(self, lower, upper): + def __init__(self, lower: int, upper: int): self._lower = lower self._upper = upper @property - def lower(self): + def lower(self) -> int: return self._lower @property - def upper(self): + def upper(self) -> int: return self._upper - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if type(other) is not type(self): return False return (self._lower, self._upper) == (other._lower, other._upper) - def __hash__(self): + def __hash__(self) -> int: return hash((self._lower, self._upper)) - def contains(self, value): + def contains(self, value: int) -> bool: return self._lower <= value <= self._upper class EnumerationFieldTypeMapping: - def __init__(self, ranges): + def __init__(self, ranges: Set[EnumerationFieldTypeMappingRange]): self._ranges = frozenset(ranges) @property - def ranges(self): + def ranges(self) -> FrozenSet[EnumerationFieldTypeMappingRange]: return self._ranges - def ranges_contain_value(self, value): + def ranges_contain_value(self, value: int) -> bool: return any([rg.contains(value) for rg in self._ranges]) +_EnumFtMappings = Mapping[str, EnumerationFieldTypeMapping] + + class EnumerationFieldTypeMappings(collections.abc.Mapping): - def __init__(self, mappings): + def __init__(self, mappings: _EnumFtMappings): self._mappings = {label: mapping for label, mapping in mappings.items()} - def __getitem__(self, key): + def __getitem__(self, key: str) -> EnumerationFieldTypeMapping: return self._mappings[key] - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._mappings) - def __len__(self): + def __len__(self) -> int: return len(self._mappings) class _EnumerationFieldType(_IntegerFieldType): - def __init__(self, size, byte_order=None, alignment=None, - preferred_display_base=DisplayBase.DECIMAL, mappings=None): + def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None, + alignment: Optional[Alignment] = None, + preferred_display_base: DisplayBase = DisplayBase.DECIMAL, + mappings: Optional[_EnumFtMappings] = None): super().__init__(size, byte_order, alignment, preferred_display_base) self._mappings = EnumerationFieldTypeMappings({}) @@ -154,10 +165,10 @@ class _EnumerationFieldType(_IntegerFieldType): self._mappings = EnumerationFieldTypeMappings(mappings) @property - def mappings(self): + def mappings(self) -> EnumerationFieldTypeMappings: return self._mappings - def labels_for_value(self, value): + def labels_for_value(self, value: int) -> Set[str]: labels = set() for label, mapping in self._mappings.items(): @@ -181,62 +192,66 @@ class RealFieldType(_BitArrayFieldType): class StringFieldType(_FieldType): @property - def alignment(self): - return 8 + def alignment(self) -> Alignment: + return Alignment(8) class _ArrayFieldType(_FieldType): - def __init__(self, element_field_type): + def __init__(self, element_field_type: _FieldType): self._element_field_type = element_field_type @property - def element_field_type(self): + def element_field_type(self) -> _FieldType: return self._element_field_type @property - def alignment(self): + def alignment(self) -> Alignment: return self._element_field_type.alignment class StaticArrayFieldType(_ArrayFieldType): - def __init__(self, length, element_field_type): + def __init__(self, length: Count, element_field_type: _FieldType): super().__init__(element_field_type) self._length = length @property - def length(self): + def length(self) -> Count: return self._length class StructureFieldTypeMember: - def __init__(self, field_type): + def __init__(self, field_type: _FieldType): self._field_type = field_type @property - def field_type(self): + def field_type(self) -> _FieldType: return self._field_type +_StructFtMembers = Mapping[str, StructureFieldTypeMember] + + class StructureFieldTypeMembers(collections.abc.Mapping): - def __init__(self, members): + def __init__(self, members: _StructFtMembers): self._members = collections.OrderedDict() for name, member in members.items(): assert type(member) is StructureFieldTypeMember self._members[name] = member - def __getitem__(self, key): + def __getitem__(self, key: str) -> StructureFieldTypeMember: return self._members[key] - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._members) - def __len__(self): + def __len__(self) -> int: return len(self._members) class StructureFieldType(_FieldType): - def __init__(self, minimum_alignment=1, members=None): + def __init__(self, minimum_alignment: Alignment = Alignment(1), + members: Optional[_StructFtMembers] = None): self._minimum_alignment = minimum_alignment self._members = StructureFieldTypeMembers({}) @@ -246,87 +261,98 @@ class StructureFieldType(_FieldType): self._set_alignment() def _set_alignment(self): - self._alignment = self._minimum_alignment + self._alignment: Alignment = self._minimum_alignment for member in self._members.values(): if member.field_type.alignment > self._alignment: self._alignment = member.field_type.alignment @property - def minimum_alignment(self): + def minimum_alignment(self) -> Alignment: return self._minimum_alignment @property - def alignment(self): + def alignment(self) -> Alignment: return self._alignment @property - def members(self): + def members(self) -> StructureFieldTypeMembers: return self._members class _UniqueByName: - def __eq__(self, other): + _name: str + + def __eq__(self, other: Any) -> bool: if type(other) is not type(self): return False return self._name == other._name - def __lt__(self, other): + def __lt__(self, other: '_UniqueByName'): assert type(self) is type(other) return self._name < other._name - def __hash__(self): + def __hash__(self) -> int: return hash(self._name) +_OptFt = Optional[_FieldType] +_OptStructFt = Optional[StructureFieldType] +LogLevel = typing.NewType('LogLevel', int) + + class EventType(_UniqueByName): - def __init__(self, name, log_level=None, specific_context_field_type=None, - payload_field_type=None): - self._id = None + def __init__(self, name: str, log_level: Optional[LogLevel] = None, + specific_context_field_type: _OptStructFt = None, payload_field_type: _OptStructFt = None): + self._id: Optional[Id] = None self._name = name self._log_level = log_level self._specific_context_field_type = specific_context_field_type self._payload_field_type = payload_field_type @property - def id(self): + def id(self) -> Optional[Id]: return self._id @property - def name(self): + def name(self) -> str: return self._name @property - def log_level(self): + def log_level(self) -> Optional[LogLevel]: return self._log_level @property - def specific_context_field_type(self): + def specific_context_field_type(self) -> _OptStructFt: return self._specific_context_field_type @property - def payload_field_type(self): + def payload_field_type(self) -> _OptStructFt: return self._payload_field_type class ClockTypeOffset: - def __init__(self, seconds=0, cycles=0): + def __init__(self, seconds: int = 0, cycles: Count = Count(0)): self._seconds = seconds self._cycles = cycles @property - def seconds(self): + def seconds(self) -> int: return self._seconds @property - def cycles(self): + def cycles(self) -> Count: return self._cycles +_OptUuid = Optional[uuidp.UUID] + + class ClockType(_UniqueByName): - def __init__(self, name, frequency=int(1e9), uuid=None, description=None, precision=0, - offset=None, origin_is_unix_epoch=False): + def __init__(self, name: str, frequency: Count = Count(int(1e9)), uuid: _OptUuid = None, + description: _OptStr = None, precision: Count = Count(0), + offset: Optional[ClockTypeOffset] = None, origin_is_unix_epoch: bool = False): self._name = name self._frequency = frequency self._uuid = uuid @@ -340,46 +366,51 @@ class ClockType(_UniqueByName): self._origin_is_unix_epoch = origin_is_unix_epoch @property - def name(self): + def name(self) -> str: return self._name @property - def frequency(self): + def frequency(self) -> Count: return self._frequency @property - def uuid(self): + def uuid(self) -> _OptUuid: return self._uuid @property - def description(self): + def description(self) -> _OptStr: return self._description @property - def precision(self): + def precision(self) -> Count: return self._precision @property - def offset(self): + def offset(self) -> ClockTypeOffset: return self._offset @property - def origin_is_unix_epoch(self): + def origin_is_unix_epoch(self) -> bool: return self._origin_is_unix_epoch DEFAULT_FIELD_TYPE = 'default' +_DefaultableUIntFt = Union[str, UnsignedIntegerFieldType] +_OptDefaultableUIntFt = Optional[_DefaultableUIntFt] +_OptUIntFt = Optional[UnsignedIntegerFieldType] class StreamTypePacketFeatures: - def __init__(self, total_size_field_type=DEFAULT_FIELD_TYPE, - content_size_field_type=DEFAULT_FIELD_TYPE, beginning_time_field_type=None, - end_time_field_type=None, discarded_events_counter_field_type=None): - def get_ft(user_ft): + def __init__(self, total_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE, + content_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE, + beginning_time_field_type: _OptDefaultableUIntFt = None, + end_time_field_type: _OptDefaultableUIntFt = None, + discarded_events_counter_field_type: _OptDefaultableUIntFt = None): + def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt: if user_ft == DEFAULT_FIELD_TYPE: return UnsignedIntegerFieldType(64) - return user_ft + return typing.cast(_OptUIntFt, user_ft) self._total_size_field_type = get_ft(total_size_field_type) self._content_size_field_type = get_ft(content_size_field_type) @@ -388,48 +419,50 @@ class StreamTypePacketFeatures: self._discarded_events_counter_field_type = get_ft(discarded_events_counter_field_type) @property - def total_size_field_type(self): + def total_size_field_type(self) -> _OptUIntFt: return self._total_size_field_type @property - def content_size_field_type(self): + def content_size_field_type(self) -> _OptUIntFt: return self._content_size_field_type @property - def beginning_time_field_type(self): + def beginning_time_field_type(self) -> _OptUIntFt: return self._beginning_time_field_type @property - def end_time_field_type(self): + def end_time_field_type(self) -> _OptUIntFt: return self._end_time_field_type @property - def discarded_events_counter_field_type(self): + def discarded_events_counter_field_type(self) -> _OptUIntFt: return self._discarded_events_counter_field_type class StreamTypeEventFeatures: - def __init__(self, type_id_field_type=DEFAULT_FIELD_TYPE, time_field_type=None): - def get_ft(user_field_type): - if user_field_type == DEFAULT_FIELD_TYPE: + def __init__(self, type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE, + time_field_type: _OptDefaultableUIntFt = None): + def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt: + if user_ft == DEFAULT_FIELD_TYPE: return UnsignedIntegerFieldType(64) - return user_field_type + return typing.cast(_OptUIntFt, user_ft) self._type_id_field_type = get_ft(type_id_field_type) self._time_field_type = get_ft(time_field_type) @property - def type_id_field_type(self): + def type_id_field_type(self) -> _OptUIntFt: return self._type_id_field_type @property - def time_field_type(self): + def time_field_type(self) -> _OptUIntFt: return self._time_field_type class StreamTypeFeatures: - def __init__(self, packet_features=None, event_features=None): + def __init__(self, packet_features: Optional[StreamTypePacketFeatures] = None, + event_features: Optional[StreamTypeEventFeatures] = None): self._packet_features = StreamTypePacketFeatures() if packet_features is not None: @@ -441,19 +474,21 @@ class StreamTypeFeatures: self._event_features = event_features @property - def packet_features(self): + def packet_features(self) -> StreamTypePacketFeatures: return self._packet_features @property - def event_features(self): + def event_features(self) -> StreamTypeEventFeatures: return self._event_features class StreamType(_UniqueByName): - def __init__(self, name, event_types, default_clock_type=None, features=None, - packet_context_field_type_extra_members=None, - event_common_context_field_type=None): - self._id = None + def __init__(self, name: str, event_types: Set[EventType], + default_clock_type: Optional[ClockType] = None, + features: Optional[StreamTypeFeatures] = None, + packet_context_field_type_extra_members: Optional[_StructFtMembers] = None, + event_common_context_field_type: _OptStructFt = None): + self._id: Optional[Id] = None self._name = name self._default_clock_type = default_clock_type self._event_common_context_field_type = event_common_context_field_type @@ -462,7 +497,7 @@ class StreamType(_UniqueByName): # assign unique IDs for index, ev_type in enumerate(sorted(self._event_types, key=lambda evt: evt.name)): assert ev_type._id is None - ev_type._id = index + ev_type._id = Id(index) self._set_features(features) self._packet_context_field_type_extra_members = StructureFieldTypeMembers({}) @@ -473,10 +508,10 @@ class StreamType(_UniqueByName): self._set_pkt_ctx_ft() self._set_ev_header_ft() - def _set_features(self, features): + def _set_features(self, features: Optional[StreamTypeFeatures]): if features is not None: self._features = features - return + return None ev_time_ft = None pkt_beginning_time_ft = None @@ -493,7 +528,7 @@ class StreamType(_UniqueByName): end_time_field_type=pkt_end_time_ft), StreamTypeEventFeatures(time_field_type=ev_time_ft)) - def _set_ft_mapped_clk_type_name(self, ft): + def _set_ft_mapped_clk_type_name(self, ft: Optional[UnsignedIntegerFieldType]): if ft is None: return @@ -502,12 +537,14 @@ class StreamType(_UniqueByName): ft._mapped_clk_type_name = self._default_clock_type.name def _set_pkt_ctx_ft(self): - def add_member_if_exists(name, ft, set_mapped_clk_type_name=False): + members = None + + def add_member_if_exists(name: str, ft: _FieldType, set_mapped_clk_type_name: bool = False): nonlocal members if ft is not None: if set_mapped_clk_type_name: - self._set_ft_mapped_clk_type_name(ft) + self._set_ft_mapped_clk_type_name(typing.cast(UnsignedIntegerFieldType, ft)) members[name] = StructureFieldTypeMember(ft) @@ -550,78 +587,85 @@ class StreamType(_UniqueByName): self._ev_header_ft = StructureFieldType(8, members) @property - def id(self): + def id(self) -> Optional[Id]: return self._id @property - def name(self): + def name(self) -> str: return self._name @property - def default_clock_type(self): + def default_clock_type(self) -> Optional[ClockType]: return self._default_clock_type @property - def features(self): + def features(self) -> StreamTypeFeatures: return self._features @property - def packet_context_field_type_extra_members(self): + def packet_context_field_type_extra_members(self) -> StructureFieldTypeMembers: return self._packet_context_field_type_extra_members @property - def event_common_context_field_type(self): + def event_common_context_field_type(self) -> _OptStructFt: return self._event_common_context_field_type @property - def event_types(self): + def event_types(self) -> FrozenSet[EventType]: return self._event_types +_OptUuidFt = Optional[Union[str, StaticArrayFieldType]] + + class TraceTypeFeatures: - def __init__(self, magic_field_type=DEFAULT_FIELD_TYPE, uuid_field_type=None, - stream_type_id_field_type=DEFAULT_FIELD_TYPE): - def get_field_type(user_field_type, default_field_type): - if user_field_type == DEFAULT_FIELD_TYPE: - return default_field_type + def __init__(self, magic_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE, + uuid_field_type: _OptUuidFt = None, + stream_type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE): + def get_field_type(user_ft: Optional[Union[str, _FieldType]], default_ft: _FieldType) -> _OptFt: + if user_ft == DEFAULT_FIELD_TYPE: + return default_ft - return user_field_type + return typing.cast(_OptFt, user_ft) - self._magic_field_type = get_field_type(magic_field_type, UnsignedIntegerFieldType(32)) - self._uuid_field_type = get_field_type(uuid_field_type, - StaticArrayFieldType(16, UnsignedIntegerFieldType(8))) - self._stream_type_id_field_type = get_field_type(stream_type_id_field_type, - UnsignedIntegerFieldType(64)) + self._magic_field_type = typing.cast(_OptUIntFt, get_field_type(magic_field_type, + UnsignedIntegerFieldType(32))) + self._uuid_field_type = typing.cast(Optional[StaticArrayFieldType], get_field_type(uuid_field_type, + StaticArrayFieldType(Count(16), + UnsignedIntegerFieldType(8)))) + self._stream_type_id_field_type = typing.cast(_OptUIntFt, get_field_type(stream_type_id_field_type, + UnsignedIntegerFieldType(64))) @property - def magic_field_type(self): + def magic_field_type(self) -> _OptUIntFt: return self._magic_field_type @property - def uuid_field_type(self): + def uuid_field_type(self) -> Optional[StaticArrayFieldType]: return self._uuid_field_type @property - def stream_type_id_field_type(self): + def stream_type_id_field_type(self) -> _OptUIntFt: return self._stream_type_id_field_type class TraceType: - def __init__(self, stream_types, default_byte_order, uuid=None, features=None): + def __init__(self, stream_types: Set[StreamType], default_byte_order: ByteOrder, + uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None): self._default_byte_order = default_byte_order self._stream_types = frozenset(stream_types) # assign unique IDs for index, stream_type in enumerate(sorted(self._stream_types, key=lambda st: st.name)): assert stream_type._id is None - stream_type._id = index + stream_type._id = Id(index) self._uuid = uuid self._set_features(features) self._set_pkt_header_ft() self._set_fts_effective_byte_order() - def _set_features(self, features): + def _set_features(self, features: Optional[TraceTypeFeatures]): if features is not None: self._features = features return @@ -631,20 +675,21 @@ class TraceType: self._features = TraceTypeFeatures(uuid_field_type=uuid_ft) def _set_pkt_header_ft(self): - def add_member_if_exists(name, field_type): + members = collections.OrderedDict() + + def add_member_if_exists(name: str, ft: _OptFt): nonlocal members - if field_type is not None: - members[name] = StructureFieldTypeMember(field_type) + if ft is not None: + members[name] = StructureFieldTypeMember(ft) - members = collections.OrderedDict() add_member_if_exists('magic', self._features.magic_field_type) add_member_if_exists('uuid', self._features.uuid_field_type) add_member_if_exists('stream_id', self._features.stream_type_id_field_type) self._pkt_header_ft = StructureFieldType(8, members) def _set_fts_effective_byte_order(self): - def set_ft_effective_byte_order(ft): + def set_ft_effective_byte_order(ft: _OptFt): if ft is None: return @@ -673,47 +718,53 @@ class TraceType: set_ft_effective_byte_order(ev_type._payload_field_type) @property - def default_byte_order(self): + def default_byte_order(self) -> ByteOrder: return self._default_byte_order @property - def uuid(self): + def uuid(self) -> _OptUuid: return self._uuid @property - def stream_types(self): + def stream_types(self) -> FrozenSet[StreamType]: return self._stream_types - def stream_type(self, name): + def stream_type(self, name: str) -> Optional[StreamType]: for cand_stream_type in self._stream_types: if cand_stream_type.name == name: return cand_stream_type + return None + @property - def features(self): + def features(self) -> TraceTypeFeatures: return self._features +_EnvEntry = Union[str, int] +_EnvEntries = Mapping[str, _EnvEntry] + + class TraceEnvironment(collections.abc.Mapping): - def __init__(self, environment): + def __init__(self, environment: _EnvEntries): self._env = {name: value for name, value in environment.items()} - def __getitem__(self, key): + def __getitem__(self, key: str) -> _EnvEntry: return self._env[key] - def __iter__(self): + def __iter__(self) -> Iterator[str]: return iter(self._env) - def __len__(self): + def __len__(self) -> int: return len(self._env) class Trace: - def __init__(self, type, environment=None): + def __init__(self, type: TraceType, environment: Optional[_EnvEntries] = None): self._type = type self._set_env(environment) - def _set_env(self, environment): + def _set_env(self, environment: Optional[_EnvEntries]): init_env = collections.OrderedDict([ ('domain', 'bare'), ('tracer_name', 'barectf'), @@ -727,49 +778,54 @@ class Trace: environment = {} init_env.update(environment) - self._env = TraceEnvironment(init_env) + self._env = TraceEnvironment(typing.cast(_EnvEntries, init_env)) @property - def type(self): + def type(self) -> TraceType: return self._type @property - def environment(self): + def environment(self) -> TraceEnvironment: return self._env +_ClkTypeCTypes = Mapping[ClockType, str] + + class ClockTypeCTypes(collections.abc.Mapping): - def __init__(self, c_types): + def __init__(self, c_types: _ClkTypeCTypes): self._c_types = {clk_type: c_type for clk_type, c_type in c_types.items()} - def __getitem__(self, key): + def __getitem__(self, key: ClockType) -> str: return self._c_types[key] - def __iter__(self): + def __iter__(self) -> Iterator[ClockType]: return iter(self._c_types) - def __len__(self): + def __len__(self) -> int: return len(self._c_types) class ConfigurationCodeGenerationHeaderOptions: - def __init__(self, identifier_prefix_definition=False, - default_stream_type_name_definition=False): + def __init__(self, identifier_prefix_definition: bool = False, + default_stream_type_name_definition: bool = False): self._identifier_prefix_definition = identifier_prefix_definition self._default_stream_type_name_definition = default_stream_type_name_definition @property - def identifier_prefix_definition(self): + def identifier_prefix_definition(self) -> bool: return self._identifier_prefix_definition @property - def default_stream_type_name_definition(self): + def default_stream_type_name_definition(self) -> bool: return self._default_stream_type_name_definition class ConfigurationCodeGenerationOptions: - def __init__(self, identifier_prefix='barectf_', file_name_prefix='barectf', - default_stream_type=None, header_options=None, clock_type_c_types=None): + def __init__(self, identifier_prefix: str = 'barectf_', file_name_prefix: str = 'barectf', + default_stream_type: Optional[StreamType] = None, + header_options: Optional[ConfigurationCodeGenerationHeaderOptions] = None, + clock_type_c_types: Optional[_ClkTypeCTypes] = None): self._identifier_prefix = identifier_prefix self._file_name_prefix = file_name_prefix self._default_stream_type = default_stream_type @@ -785,40 +841,41 @@ class ConfigurationCodeGenerationOptions: self._clock_type_c_types = ClockTypeCTypes(clock_type_c_types) @property - def identifier_prefix(self): + def identifier_prefix(self) -> str: return self._identifier_prefix @property - def file_name_prefix(self): + def file_name_prefix(self) -> str: return self._file_name_prefix @property - def default_stream_type(self): + def default_stream_type(self) -> Optional[StreamType]: return self._default_stream_type @property - def header_options(self): + def header_options(self) -> ConfigurationCodeGenerationHeaderOptions: return self._header_options @property - def clock_type_c_types(self): + def clock_type_c_types(self) -> ClockTypeCTypes: return self._clock_type_c_types class ConfigurationOptions: - def __init__(self, code_generation_options=None): + def __init__(self, + code_generation_options: Optional[ConfigurationCodeGenerationOptions] = None): self._code_generation_options = ConfigurationCodeGenerationOptions() if code_generation_options is not None: self._code_generation_options = code_generation_options @property - def code_generation_options(self): + def code_generation_options(self) -> ConfigurationCodeGenerationOptions: return self._code_generation_options class Configuration: - def __init__(self, trace, options=None): + def __init__(self, trace: Trace, options: Optional[ConfigurationOptions] = None): self._trace = trace self._options = ConfigurationOptions() @@ -837,9 +894,9 @@ class Configuration: clk_type_c_types._c_types[def_clk_type] = 'uint32_t' @property - def trace(self): + def trace(self) -> Trace: return self._trace @property - def options(self): + def options(self) -> ConfigurationOptions: return self._options diff --git a/barectf/config_file.py b/barectf/config_file.py index 5090d59..c5a04d7 100644 --- a/barectf/config_file.py +++ b/barectf/config_file.py @@ -22,11 +22,15 @@ # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import barectf.config_parse as barectf_config_parse +import barectf.config as barectf_config +from barectf.typing import Count, VersionNumber +from typing import Optional, List, TextIO -def effective_configuration_file(file, with_package_inclusion_directory=True, - inclusion_directories=None, ignore_inclusion_not_found=False, - indent_space_count=2): +def effective_configuration_file(file: TextIO, with_package_inclusion_directory: bool = True, + inclusion_directories: Optional[List[str]] = None, + ignore_inclusion_not_found: bool = False, + indent_space_count: Count = Count(2)) -> str: if inclusion_directories is None: inclusion_directories = [] @@ -36,8 +40,9 @@ def effective_configuration_file(file, with_package_inclusion_directory=True, indent_space_count) -def configuration_from_file(file, with_package_inclusion_directory=True, inclusion_directories=None, - ignore_inclusion_not_found=False): +def configuration_from_file(file: TextIO, with_package_inclusion_directory: bool = True, + inclusion_directories: Optional[List[str]] = None, + ignore_inclusion_not_found: bool = False) -> barectf_config.Configuration: if inclusion_directories is None: inclusion_directories = [] @@ -45,5 +50,5 @@ def configuration_from_file(file, with_package_inclusion_directory=True, inclusi inclusion_directories, ignore_inclusion_not_found) -def configuration_file_major_version(file): +def configuration_file_major_version(file: TextIO) -> VersionNumber: return barectf_config_parse._config_file_major_version(file) diff --git a/barectf/config_parse.py b/barectf/config_parse.py index 4b26baa..1f7efda 100644 --- a/barectf/config_parse.py +++ b/barectf/config_parse.py @@ -23,27 +23,37 @@ import barectf.config_parse_common as barectf_config_parse_common from barectf.config_parse_common import _ConfigurationParseError +from barectf.config_parse_common import _MapNode import barectf.config_parse_v2 as barectf_config_parse_v2 import barectf.config_parse_v3 as barectf_config_parse_v3 +import barectf.config as barectf_config import collections +from barectf.typing import Count, VersionNumber +from typing import Optional, List, TextIO +import typing # Creates and returns a barectf 3 YAML configuration file parser to # parse the file-like object `file`. # # `file` can be a barectf 2 or 3 configuration file. -def _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found): +def _create_v3_parser(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]], + ignore_include_not_found: bool) -> barectf_config_parse_v3._Parser: try: root_node = barectf_config_parse_common._yaml_load(file) if type(root_node) is barectf_config_parse_common._ConfigNodeV3: # barectf 3 configuration file - return barectf_config_parse_v3._Parser(file, root_node, with_pkg_include_dir, - include_dirs, ignore_include_not_found) + return barectf_config_parse_v3._Parser(file, + typing.cast(barectf_config_parse_common._ConfigNodeV3, + root_node), + with_pkg_include_dir, include_dirs, + ignore_include_not_found) elif type(root_node) is collections.OrderedDict: # barectf 2 configuration file - v2_parser = barectf_config_parse_v2._Parser(file, root_node, with_pkg_include_dir, - include_dirs, ignore_include_not_found) + v2_parser = barectf_config_parse_v2._Parser(file, typing.cast(_MapNode, root_node), + with_pkg_include_dir, include_dirs, + ignore_include_not_found) return barectf_config_parse_v3._Parser(file, v2_parser.config_node, with_pkg_include_dir, include_dirs, ignore_include_not_found) @@ -54,13 +64,18 @@ def _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_n barectf_config_parse_common._append_error_ctx(exc, 'Configuration', 'Cannot create configuration from YAML file') + # satisfy static type checker (never reached) + raise + -def _from_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found): +def _from_file(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]], + ignore_include_not_found: bool) -> barectf_config.Configuration: return _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found).config -def _effective_config_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found, - indent_space_count): +def _effective_config_file(file: TextIO, with_pkg_include_dir: bool, + include_dirs: Optional[List[str]], ignore_include_not_found: bool, + indent_space_count: Count) -> str: config_node = _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found).config_node return barectf_config_parse_common._yaml_dump(config_node, indent=indent_space_count, @@ -68,15 +83,19 @@ def _effective_config_file(file, with_pkg_include_dir, include_dirs, ignore_incl explicit_end=True) -def _config_file_major_version(file): +def _config_file_major_version(file: TextIO) -> VersionNumber: try: root_node = barectf_config_parse_common._yaml_load(file) if type(root_node) is barectf_config_parse_common._ConfigNodeV3: # barectf 3 configuration file - return 3 - elif type(root_node) is collections.OrderedDict: + return VersionNumber(3) + else: # barectf 2 configuration file - return 2 + assert type(root_node) is collections.OrderedDict + return VersionNumber(2) except _ConfigurationParseError as exc: barectf_config_parse_common._append_error_ctx(exc, 'Configuration', 'Cannot load YAML file') + + # satisfy static type checker (never reached) + raise diff --git a/barectf/config_parse_common.py b/barectf/config_parse_common.py index 06e0c42..e790694 100644 --- a/barectf/config_parse_common.py +++ b/barectf/config_parse_common.py @@ -23,38 +23,33 @@ import pkg_resources import collections -import jsonschema +import jsonschema # type: ignore import os.path import yaml import copy import os +from barectf.typing import VersionNumber, _OptStr +from typing import Optional, List, Dict, Any, TextIO, MutableMapping, Union, Set, Iterable, Callable, Tuple +import typing # The context of a configuration parsing error. # # Such a context object has a name and, optionally, a message. class _ConfigurationParseErrorContext: - def __init__(self, name, message=None): + def __init__(self, name: str, message: _OptStr = None): self._name = name self._msg = message @property - def name(self): + def name(self) -> str: return self._name @property - def message(self): + def message(self) -> _OptStr: return self._msg -# Appends the context having the object name `obj_name` and the -# (optional) message `message` to the `_ConfigurationParseError` -# exception `exc` and then raises `exc` again. -def _append_error_ctx(exc, obj_name, message=None): - exc._append_ctx(obj_name, message) - raise exc - - # A configuration parsing error. # # Such an error object contains a list of contexts (`context` property). @@ -68,14 +63,14 @@ def _append_error_ctx(exc, obj_name, message=None): class _ConfigurationParseError(Exception): def __init__(self, init_ctx_obj_name, init_ctx_msg=None): super().__init__() - self._ctx = [] + self._ctx: List[_ConfigurationParseErrorContext] = [] self._append_ctx(init_ctx_obj_name, init_ctx_msg) @property - def context(self): + def context(self) -> List[_ConfigurationParseErrorContext]: return self._ctx - def _append_ctx(self, name, msg=None): + def _append_ctx(self, name: str, msg: _OptStr = None): self._ctx.append(_ConfigurationParseErrorContext(name, msg)) def __str__(self): @@ -92,11 +87,19 @@ class _ConfigurationParseError(Exception): return '\n'.join(lines) +# Appends the context having the object name `obj_name` and the +# (optional) message `message` to the `_ConfigurationParseError` +# exception `exc` and then raises `exc` again. +def _append_error_ctx(exc: _ConfigurationParseError, obj_name: str, message: _OptStr = None): + exc._append_ctx(obj_name, message) + raise exc + + _V3Prefixes = collections.namedtuple('_V3Prefixes', ['identifier', 'file_name']) # Convers a v2 prefix to v3 prefixes. -def _v3_prefixes_from_v2_prefix(v2_prefix): +def _v3_prefixes_from_v2_prefix(v2_prefix: str) -> _V3Prefixes: return _V3Prefixes(v2_prefix, v2_prefix.rstrip('_')) @@ -106,10 +109,15 @@ def _v3_prefixes_from_v2_prefix(v2_prefix): # This must never happen in barectf because all our schemas are local; # it would mean a programming or schema error. class _RefResolver(jsonschema.RefResolver): - def resolve_remote(self, uri): + def resolve_remote(self, uri: str): raise RuntimeError(f'Missing local schema with URI `{uri}`') +# Not all static type checkers support type recursion, so let's just use +# `Any` as a map node's value's type. +_MapNode = MutableMapping[str, Any] + + # Schema validator which considers all the schemas found in the # subdirectories `subdirs` (at build time) of the barectf package's # `schemas` directory. @@ -117,9 +125,9 @@ class _RefResolver(jsonschema.RefResolver): # The only public method is validate() which accepts an instance to # validate as well as a schema short ID. class _SchemaValidator: - def __init__(self, subdirs): + def __init__(self, subdirs: Iterable[str]): schemas_dir = pkg_resources.resource_filename(__name__, 'schemas') - self._store = {} + self._store: Dict[str, str] = {} for subdir in subdirs: dir = os.path.join(schemas_dir, subdir) @@ -155,7 +163,7 @@ class _SchemaValidator: return dct - def _validate(self, instance, schema_short_id): + def _validate(self, instance: _MapNode, schema_short_id: str): # retrieve full schema ID from short ID schema_id = f'https://barectf.org/schemas/{schema_short_id}.json' assert schema_id in self._store @@ -187,7 +195,7 @@ class _SchemaValidator: # # Raises a `_ConfigurationParseError` object, hiding any # `jsonschema` exception, on validation failure. - def validate(self, instance, schema_short_id): + def validate(self, instance: _MapNode, schema_short_id: str): try: self._validate(instance, schema_short_id) except jsonschema.ValidationError as exc: @@ -234,11 +242,11 @@ class _SchemaValidator: # barectf 3 YAML configuration node. class _ConfigNodeV3: - def __init__(self, config_node): + def __init__(self, config_node: _MapNode): self._config_node = config_node @property - def config_node(self): + def config_node(self) -> _MapNode: return self._config_node @@ -253,11 +261,11 @@ _CONFIG_V3_YAML_TAG = 'tag:barectf.org,2020/3/config' # `collections.OrderedDict` object. # # All YAML maps are loaded as `collections.OrderedDict` objects. -def _yaml_load(file): +def _yaml_load(file: TextIO) -> Union[_ConfigNodeV3, _MapNode]: class Loader(yaml.Loader): pass - def config_ctor(loader, node): + def config_ctor(loader, node) -> _ConfigNodeV3: if not isinstance(node, yaml.MappingNode): problem = f'Expecting a map for the tag `{node.tag}`' raise yaml.constructor.ConstructorError(problem=problem) @@ -265,7 +273,7 @@ def _yaml_load(file): loader.flatten_mapping(node) return _ConfigNodeV3(collections.OrderedDict(loader.construct_pairs(node))) - def mapping_ctor(loader, node): + def mapping_ctor(loader, node) -> _MapNode: loader.flatten_mapping(node) return collections.OrderedDict(loader.construct_pairs(node)) @@ -279,7 +287,7 @@ def _yaml_load(file): raise _ConfigurationParseError('YAML loader', f'Cannot load file: {exc}') -def _yaml_load_path(path): +def _yaml_load_path(path: str) -> Union[_ConfigNodeV3, _MapNode]: with open(path) as f: return _yaml_load(f) @@ -287,7 +295,7 @@ def _yaml_load_path(path): # Dumps the content of the Python object `obj` # (`collections.OrderedDict` or `_ConfigNodeV3`) as a YAML string and # returns it. -def _yaml_dump(node, **kwds): +def _yaml_dump(node: _MapNode, **kwds) -> str: class Dumper(yaml.Dumper): pass @@ -311,16 +319,17 @@ def _yaml_dump(node, **kwds): # mostly contains helpers. class _Parser: # Builds a base barectf YAML configuration parser to process the - # configuration node `node` (already loaded from the file having the - # path `path`). + # configuration node `node` (already loaded from the file-like + # object `file`). # # For its _process_node_include() method, the parser considers the # package inclusion directory as well as `include_dirs`, and ignores # nonexistent inclusion files if `ignore_include_not_found` is # `True`. - def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, - major_version): - self._root_path = path + def __init__(self, root_file: TextIO, node: Union[_MapNode, _ConfigNodeV3], + with_pkg_include_dir: bool, include_dirs: Optional[List[str]], + ignore_include_not_found: bool, major_version: VersionNumber): + self._root_file = root_file self._root_node = node self._ft_prop_names = [ # barectf 2.1+ @@ -335,35 +344,42 @@ class _Parser: 'element-field-type', ] + if include_dirs is None: + include_dirs = [] + self._include_dirs = copy.copy(include_dirs) if with_pkg_include_dir: self._include_dirs.append(pkg_resources.resource_filename(__name__, f'include/{major_version}')) self._ignore_include_not_found = ignore_include_not_found - self._include_stack = [] - self._resolved_ft_aliases = set() + self._include_stack: List[str] = [] + self._resolved_ft_aliases: Set[str] = set() self._schema_validator = _SchemaValidator({'common/config', f'{major_version}/config'}) self._major_version = major_version @property - def _struct_ft_node_members_prop_name(self): + def _struct_ft_node_members_prop_name(self) -> str: if self._major_version == 2: return 'fields' else: return 'members' # Returns the last included file name from the parser's inclusion - # file name stack. - def _get_last_include_file(self): + # file name stack, or `N/A` if the root file does not have an + # associated path under the `name` property. + def _get_last_include_file(self) -> str: if self._include_stack: return self._include_stack[-1] - return self._root_path + if hasattr(self._root_file, 'name'): + return typing.cast(str, self._root_file.name) + + return 'N/A' # Loads the inclusion file having the path `yaml_path` and returns # its content as a `collections.OrderedDict` object. - def _load_include(self, yaml_path): + def _load_include(self, yaml_path) -> Optional[_MapNode]: for inc_dir in self._include_dirs: # Current inclusion dir + file name path. # @@ -389,38 +405,37 @@ class _Parser: self._include_stack.append(norm_path) # load raw content - return _yaml_load_path(norm_path) + return typing.cast(_MapNode, _yaml_load_path(norm_path)) if not self._ignore_include_not_found: base_path = self._get_last_include_file() raise _ConfigurationParseError(f'File `{base_path}`', f'Cannot include file `{yaml_path}`: file not found in inclusion directories') + return None + # Returns a list of all the inclusion file paths as found in the # inclusion node `include_node`. - def _get_include_paths(self, include_node): + def _get_include_paths(self, include_node: _MapNode) -> List[str]: if include_node is None: # none return [] if type(include_node) is str: # wrap as array - return [include_node] + return [typing.cast(str, include_node)] # already an array assert type(include_node) is list - return include_node + return typing.cast(List[str], include_node) # Updates the node `base_node` with an overlay node `overlay_node`. # # Both the inclusion and field type node inheritance features use # this update mechanism. - def _update_node(self, base_node, overlay_node): + def _update_node(self, base_node: _MapNode, overlay_node: _MapNode): # see the comment about the `members` property below - def update_members_node(base_value, olay_value): - assert type(olay_value) is list - assert type(base_value) is list - + def update_members_node(base_value: List[Any], olay_value: List[Any]): for olay_item in olay_value: # assume we append `olay_item` to `base_value` initially append_olay_item = True @@ -567,9 +582,9 @@ class _Parser: # `last_overlay_node` and then patches the current base node with # its other properties before returning the result (always a deep # copy). - def _process_node_include(self, last_overlay_node, - process_base_include_cb, - process_children_include_cb=None): + def _process_node_include(self, last_overlay_node: _MapNode, + process_base_include_cb: Callable[[_MapNode], _MapNode], + process_children_include_cb: Optional[Callable[[_MapNode], None]] = None) -> _MapNode: # process children inclusions first if process_children_include_cb is not None: process_children_include_cb(last_overlay_node) @@ -629,13 +644,16 @@ class _Parser: # Generates pairs of member node and field type node property name # (in the member node) for the structure field type node's members # node `node`. - def _struct_ft_member_fts_iter(self, node): + def _struct_ft_member_fts_iter(self, + node: Union[List[_MapNode], _MapNode]) -> Iterable[Tuple[_MapNode, str]]: if type(node) is list: # barectf 3 assert self._major_version == 3 + node = typing.cast(List[_MapNode], node) for member_node in node: assert type(member_node) is collections.OrderedDict + member_node = typing.cast(_MapNode, member_node) name, val = list(member_node.items())[0] if type(val) is collections.OrderedDict: @@ -647,6 +665,7 @@ class _Parser: # barectf 2 assert self._major_version == 2 assert type(node) is collections.OrderedDict + node = typing.cast(_MapNode, node) for name in node: yield node, name @@ -661,7 +680,8 @@ class _Parser: # # `ctx_obj_name` is the context's object name when this method # raises a `_ConfigurationParseError` exception. - def _resolve_ft_alias(self, ft_aliases_node, parent_node, key, ctx_obj_name, alias_set=None): + def _resolve_ft_alias(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str, + ctx_obj_name: str, alias_set: Optional[Set[str]] = None): if key not in parent_node: return @@ -722,7 +742,7 @@ class _Parser: # Like _resolve_ft_alias(), but builds a context object name for any # `ctx_obj_name` exception. - def _resolve_ft_alias_from(self, ft_aliases_node, parent_node, key): + def _resolve_ft_alias_from(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str): self._resolve_ft_alias(ft_aliases_node, parent_node, key, f'`{key}` property') # Applies field type node inheritance to the property `key` of @@ -735,7 +755,7 @@ class _Parser: # # When this method returns, no field type node has an `$inherit` or # `inherit` property. - def _apply_ft_inheritance(self, parent_node, key): + def _apply_ft_inheritance(self, parent_node: _MapNode, key: str): if key not in parent_node: return diff --git a/barectf/config_parse_v2.py b/barectf/config_parse_v2.py index 4faea4c..2fbf898 100644 --- a/barectf/config_parse_v2.py +++ b/barectf/config_parse_v2.py @@ -24,22 +24,27 @@ from barectf.config_parse_common import _ConfigurationParseError from barectf.config_parse_common import _append_error_ctx import barectf.config_parse_common as config_parse_common +from barectf.config_parse_common import _MapNode import collections import copy +from barectf.typing import VersionNumber, _OptStr +from typing import Optional, List, Dict, TextIO, Union, Callable +import typing -def _del_prop_if_exists(node, prop_name): +def _del_prop_if_exists(node: _MapNode, prop_name: str): if prop_name in node: del node[prop_name] -def _rename_prop(node, old_prop_name, new_prop_name): +def _rename_prop(node: _MapNode, old_prop_name: str, new_prop_name: str): if old_prop_name in node: node[new_prop_name] = node[old_prop_name] del node[old_prop_name] -def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None): +def _copy_prop_if_exists(dst_node: _MapNode, src_node: _MapNode, src_prop_name: str, + dst_prop_name: _OptStr = None): if dst_prop_name is None: dst_prop_name = src_prop_name @@ -59,10 +64,13 @@ def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None): # parsing stages and general strategy. class _Parser(config_parse_common._Parser): # Builds a barectf 2 YAML configuration parser and parses the root - # configuration node `node` (already loaded from `path`). - def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found): - super().__init__(path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, 2) - self._ft_cls_name_to_conv_method = { + # configuration node `node` (already loaded from the file-like + # object `root_file`). + def __init__(self, root_file: TextIO, node: _MapNode, with_pkg_include_dir: bool, + include_dirs: Optional[List[str]], ignore_include_not_found: bool): + super().__init__(root_file, node, with_pkg_include_dir, include_dirs, + ignore_include_not_found, VersionNumber(2)) + self._ft_cls_name_to_conv_method: Dict[str, Callable[[_MapNode], _MapNode]] = { 'int': self._conv_int_ft_node, 'integer': self._conv_int_ft_node, 'enum': self._conv_enum_ft_node, @@ -80,24 +88,24 @@ class _Parser(config_parse_common._Parser): # Converts a v2 field type node to a v3 field type node and returns # it. - def _conv_ft_node(self, v2_ft_node): + def _conv_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: assert 'class' in v2_ft_node cls = v2_ft_node['class'] assert cls in self._ft_cls_name_to_conv_method return self._ft_cls_name_to_conv_method[cls](v2_ft_node) - def _conv_ft_node_if_exists(self, v2_parent_node, key): + def _conv_ft_node_if_exists(self, v2_parent_node: Optional[_MapNode], key: str) -> Optional[_MapNode]: if v2_parent_node is None: - return + return None if key not in v2_parent_node: - return + return None return self._conv_ft_node(v2_parent_node[key]) # Converts a v2 integer field type node to a v3 integer field type # node and returns it. - def _conv_int_ft_node(self, v2_ft_node): + def _conv_int_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # copy v2 integer field type node v3_ft_node = copy.deepcopy(v2_ft_node) @@ -128,7 +136,7 @@ class _Parser(config_parse_common._Parser): # Converts a v2 enumeration field type node to a v3 enumeration # field type node and returns it. - def _conv_enum_ft_node(self, v2_ft_node): + def _conv_enum_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # An enumeration field type _is_ an integer field type, so use a # copy of the converted v2 value field type node. v3_ft_node = copy.deepcopy(self._conv_ft_node(v2_ft_node['value-type'])) @@ -147,10 +155,12 @@ class _Parser(config_parse_common._Parser): members_node = v2_ft_node.get(prop_name) if members_node is not None: - mappings_node = collections.OrderedDict() + mappings_node: _MapNode = collections.OrderedDict() cur = 0 for member_node in members_node: + v3_value_node: Union[int, List[int]] + if type(member_node) is str: label = member_node v3_value_node = cur @@ -180,7 +190,7 @@ class _Parser(config_parse_common._Parser): # Converts a v2 real field type node to a v3 real field type node # and returns it. - def _conv_real_ft_node(self, v2_ft_node): + def _conv_real_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # copy v2 real field type node v3_ft_node = copy.deepcopy(v2_ft_node) @@ -198,7 +208,7 @@ class _Parser(config_parse_common._Parser): # Converts a v2 string field type node to a v3 string field type # node and returns it. - def _conv_string_ft_node(self, v2_ft_node): + def _conv_string_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # copy v2 string field type node v3_ft_node = copy.deepcopy(v2_ft_node) @@ -209,9 +219,9 @@ class _Parser(config_parse_common._Parser): # Converts a v2 array field type node to a v3 (static) array field # type node and returns it. - def _conv_static_array_ft_node(self, v2_ft_node): + def _conv_static_array_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # class renamed to `static-array` - v3_ft_node = collections.OrderedDict({'class': 'static-array'}) + v3_ft_node: _MapNode = collections.OrderedDict({'class': 'static-array'}) # copy `length` property _copy_prop_if_exists(v3_ft_node, v2_ft_node, 'length') @@ -223,7 +233,7 @@ class _Parser(config_parse_common._Parser): # Converts a v2 structure field type node to a v3 structure field # type node and returns it. - def _conv_struct_ft_node(self, v2_ft_node): + def _conv_struct_ft_node(self, v2_ft_node: _MapNode) -> _MapNode: # Create fresh v3 structure field type node, reusing the class # of `v2_ft_node`. v3_ft_node = collections.OrderedDict({'class': v2_ft_node['class']}) @@ -250,7 +260,7 @@ class _Parser(config_parse_common._Parser): # Converts a v2 clock type node to a v3 clock type node and returns # it. - def _conv_clk_type_node(self, v2_clk_type_node): + def _conv_clk_type_node(self, v2_clk_type_node: _MapNode) -> _MapNode: # copy v2 clock type node v3_clk_type_node = copy.deepcopy(v2_clk_type_node) @@ -272,9 +282,9 @@ class _Parser(config_parse_common._Parser): # Converts a v2 event type node to a v3 event type node and returns # it. - def _conv_ev_type_node(self, v2_ev_type_node): + def _conv_ev_type_node(self, v2_ev_type_node: _MapNode) -> _MapNode: # create empty v3 event type node - v3_ev_type_node = collections.OrderedDict() + v3_ev_type_node: _MapNode = collections.OrderedDict() # copy `log-level` property _copy_prop_if_exists(v3_ev_type_node, v2_ev_type_node, 'log-level') @@ -294,7 +304,8 @@ class _Parser(config_parse_common._Parser): return v3_ev_type_node @staticmethod - def _set_v3_feature_ft_if_exists(v3_features_node, key, node): + def _set_v3_feature_ft_if_exists(v3_features_node: _MapNode, key: str, + node: Union[Optional[_MapNode], bool]): val = node if val is None: @@ -304,12 +315,12 @@ class _Parser(config_parse_common._Parser): # Converts a v2 stream type node to a v3 stream type node and # returns it. - def _conv_stream_type_node(self, v2_stream_type_node): + def _conv_stream_type_node(self, v2_stream_type_node: _MapNode) -> _MapNode: # This function creates a v3 stream type features node from the # packet context and event header field type nodes of a # v2 stream type node. - def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node, - v2_ev_header_ft_fields_node): + def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node: _MapNode, + v2_ev_header_ft_fields_node: Optional[_MapNode]) -> _MapNode: if v2_ev_header_ft_fields_node is None: v2_ev_header_ft_fields_node = collections.OrderedDict() @@ -324,9 +335,9 @@ class _Parser(config_parse_common._Parser): v3_ev_type_id_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'id') v3_ev_time_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'timestamp') - v3_features_node = collections.OrderedDict() - v3_pkt_node = collections.OrderedDict() - v3_ev_node = collections.OrderedDict() + v3_features_node: _MapNode = collections.OrderedDict() + v3_pkt_node: _MapNode = collections.OrderedDict() + v3_ev_node: _MapNode = collections.OrderedDict() v3_pkt_node['total-size-field-type'] = v3_pkt_total_size_ft_node v3_pkt_node['content-size-field-type'] = v3_pkt_content_size_ft_node self._set_v3_feature_ft_if_exists(v3_pkt_node, 'beginning-time-field-type', @@ -342,9 +353,9 @@ class _Parser(config_parse_common._Parser): v3_features_node['event'] = v3_ev_node return v3_features_node - def clk_type_name_from_v2_int_ft_node(v2_int_ft_node): + def clk_type_name_from_v2_int_ft_node(v2_int_ft_node: Optional[_MapNode]) -> _OptStr: if v2_int_ft_node is None: - return + return None assert v2_int_ft_node['class'] in ('int', 'integer') prop_mappings_node = v2_int_ft_node.get('property-mappings') @@ -352,8 +363,10 @@ class _Parser(config_parse_common._Parser): if prop_mappings_node is not None and len(prop_mappings_node) > 0: return prop_mappings_node[0]['name'] + return None + # create empty v3 stream type node - v3_stream_type_node = collections.OrderedDict() + v3_stream_type_node: _MapNode = collections.OrderedDict() # rename `$default` property to `$is-default` _copy_prop_if_exists(v3_stream_type_node, v2_stream_type_node, '$default', '$is-default') @@ -446,8 +459,8 @@ class _Parser(config_parse_common._Parser): return v3_stream_type_node # Converts a v2 metadata node to a v3 trace node and returns it. - def _conv_meta_node(self, v2_meta_node): - def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node): + def _conv_meta_node(self, v2_meta_node: _MapNode) -> _MapNode: + def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node: Optional[_MapNode]) -> _MapNode: def set_if_exists(key, node): return self._set_v3_feature_ft_if_exists(v3_features_node, key, node) @@ -460,14 +473,14 @@ class _Parser(config_parse_common._Parser): v3_uuid_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'uuid') v3_stream_type_id_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'stream_id') - v3_features_node = collections.OrderedDict() + v3_features_node: _MapNode = collections.OrderedDict() set_if_exists('magic-field-type', v3_magic_ft_node) set_if_exists('uuid-field-type', v3_uuid_ft_node) set_if_exists('stream-type-id-field-type', v3_stream_type_id_ft_node) return v3_features_node - v3_trace_node = collections.OrderedDict() - v3_trace_type_node = collections.OrderedDict() + v3_trace_node: _MapNode = collections.OrderedDict() + v3_trace_type_node: _MapNode = collections.OrderedDict() v2_trace_node = v2_meta_node['trace'] # rename `byte-order` property to `$default-byte-order` @@ -663,7 +676,7 @@ class _Parser(config_parse_common._Parser): # Processes the inclusions of the event type node `ev_type_node`, # returning the effective node. - def _process_ev_type_node_include(self, ev_type_node): + def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode: # Make sure the event type node is valid for the inclusion # processing stage. self._schema_validator.validate(ev_type_node, '2/config/event-type-pre-include') @@ -673,7 +686,7 @@ class _Parser(config_parse_common._Parser): # Processes the inclusions of the stream type node # `stream_type_node`, returning the effective node. - def _process_stream_type_node_include(self, stream_type_node): + def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode: def process_children_include(stream_type_node): prop_name = 'events' @@ -693,7 +706,7 @@ class _Parser(config_parse_common._Parser): # Processes the inclusions of the trace type node `trace_type_node`, # returning the effective node. - def _process_trace_type_node_include(self, trace_type_node): + def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode: # Make sure the trace type node is valid for the inclusion # processing stage. self._schema_validator.validate(trace_type_node, '2/config/trace-type-pre-include') @@ -703,7 +716,7 @@ class _Parser(config_parse_common._Parser): # Processes the inclusions of the clock type node `clk_type_node`, # returning the effective node. - def _process_clk_type_node_include(self, clk_type_node): + def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode: # Make sure the clock type node is valid for the inclusion # processing stage. self._schema_validator.validate(clk_type_node, '2/config/clock-type-pre-include') @@ -713,8 +726,8 @@ class _Parser(config_parse_common._Parser): # Processes the inclusions of the metadata node `meta_node`, # returning the effective node. - def _process_meta_node_include(self, meta_node): - def process_children_include(meta_node): + def _process_meta_node_include(self, meta_node: _MapNode) -> _MapNode: + def process_children_include(meta_node: _MapNode): prop_name = 'trace' if prop_name in meta_node: @@ -833,5 +846,5 @@ class _Parser(config_parse_common._Parser): self._transform_config_node() @property - def config_node(self): - return config_parse_common._ConfigNodeV3(self._root_node) + def config_node(self) -> config_parse_common._ConfigNodeV3: + return config_parse_common._ConfigNodeV3(typing.cast(_MapNode, self._root_node)) diff --git a/barectf/config_parse_v3.py b/barectf/config_parse_v3.py index fe5a1d0..fccf1a1 100644 --- a/barectf/config_parse_v3.py +++ b/barectf/config_parse_v3.py @@ -24,9 +24,14 @@ import barectf.config_parse_common as barectf_config_parse_common from barectf.config_parse_common import _ConfigurationParseError from barectf.config_parse_common import _append_error_ctx +from barectf.config_parse_common import _MapNode import barectf.config as barectf_config +from barectf.config import _OptFt, _OptStructFt import collections import uuid +from barectf.typing import Count, Alignment, VersionNumber +from typing import Optional, List, Dict, Any, TextIO, Set, Iterable, Callable, Tuple, Type +import typing # A barectf 3 YAML configuration parser. @@ -40,11 +45,14 @@ import uuid # parsing stages and general strategy. class _Parser(barectf_config_parse_common._Parser): # Builds a barectf 3 YAML configuration parser and parses the root - # configuration node `node` (already loaded from `path`). - def __init__(self, path, node, with_pkg_include_dir, inclusion_dirs, ignore_include_not_found): - super().__init__(path, node, with_pkg_include_dir, inclusion_dirs, - ignore_include_not_found, 3) - self._ft_cls_name_to_create_method = { + # configuration node `node` (already loaded from the file-like + # object `root_file`). + def __init__(self, root_file: TextIO, node: barectf_config_parse_common._ConfigNodeV3, + with_pkg_include_dir: bool, inclusion_dirs: Optional[List[str]], + ignore_include_not_found: bool): + super().__init__(root_file, node, with_pkg_include_dir, inclusion_dirs, + ignore_include_not_found, VersionNumber(3)) + self._ft_cls_name_to_create_method: Dict[str, Callable[[_MapNode], barectf_config._FieldType]] = { 'unsigned-integer': self._create_int_ft, 'signed-integer': self._create_int_ft, 'unsigned-enumeration': self._create_enum_ft, @@ -60,7 +68,7 @@ class _Parser(barectf_config_parse_common._Parser): # `_ConfigurationParseError` exception using `ctx_obj_name` if it's # invalid. @staticmethod - def _validate_alignment(alignment, ctx_obj_name): + def _validate_alignment(alignment: Alignment, ctx_obj_name: str): assert alignment >= 1 # check for power of two @@ -72,7 +80,7 @@ class _Parser(barectf_config_parse_common._Parser): # `_ConfigurationParseError` exception using `ctx_obj_name` and # `prop` to format the message if it's invalid. @staticmethod - def _validate_iden(iden, ctx_obj_name, prop): + def _validate_iden(iden: str, ctx_obj_name: str, prop: str): assert type(iden) is str ctf_keywords = { 'align', @@ -97,20 +105,20 @@ class _Parser(barectf_config_parse_common._Parser): raise _ConfigurationParseError(ctx_obj_name, msg) @staticmethod - def _alignment_prop(ft_node, prop_name): + def _alignment_prop(ft_node: _MapNode, prop_name: str) -> Alignment: alignment = ft_node.get(prop_name) if alignment is not None: _Parser._validate_alignment(alignment, '`prop_name` property') - return alignment + return Alignment(alignment) @property - def _trace_type_node(self): - return self._root_node.config_node['trace']['type'] + def _trace_type_node(self) -> _MapNode: + return self.config_node['trace']['type'] @staticmethod - def _byte_order_from_node(node): + def _byte_order_from_node(node: str) -> barectf_config.ByteOrder: return { 'big-endian': barectf_config.ByteOrder.BIG_ENDIAN, 'little-endian': barectf_config.ByteOrder.LITTLE_ENDIAN, @@ -119,7 +127,10 @@ class _Parser(barectf_config_parse_common._Parser): # Creates a bit array field type having the type `ft_type` from the # bit array field type node `ft_node`, passing the additional # `*args` to ft_type.__init__(). - def _create_common_bit_array_ft(self, ft_node, ft_type, default_alignment, *args): + def _create_common_bit_array_ft(self, ft_node: _MapNode, + ft_type: Type[barectf_config._BitArrayFieldType], + default_alignment: Optional[Alignment], + *args) -> barectf_config._BitArrayFieldType: byte_order = self._byte_order_from_node(ft_node['byte-order']) alignment = self._alignment_prop(ft_node, 'alignment') @@ -131,18 +142,21 @@ class _Parser(barectf_config_parse_common._Parser): # Creates an integer field type having the type `ft_type` from the # integer field type node `ft_node`, passing the additional `*args` # to ft_type.__init__(). - def _create_common_int_ft(self, ft_node, ft_type, *args): + def _create_common_int_ft(self, ft_node: _MapNode, + ft_type: Type[barectf_config._IntegerFieldType], *args) -> barectf_config._IntegerFieldType: preferred_display_base = { 'binary': barectf_config.DisplayBase.BINARY, 'octal': barectf_config.DisplayBase.OCTAL, 'decimal': barectf_config.DisplayBase.DECIMAL, 'hexadecimal': barectf_config.DisplayBase.HEXADECIMAL, }[ft_node.get('preferred-display-base', 'decimal')] - return self._create_common_bit_array_ft(ft_node, ft_type, None, preferred_display_base, *args) + return typing.cast(barectf_config._IntegerFieldType, + self._create_common_bit_array_ft(ft_node, ft_type, None, + preferred_display_base, *args)) # Creates an integer field type from the unsigned/signed integer # field type node `ft_node`. - def _create_int_ft(self, ft_node): + def _create_int_ft(self, ft_node: _MapNode) -> barectf_config._IntegerFieldType: ft_type = { 'unsigned-integer': barectf_config.UnsignedIntegerFieldType, 'signed-integer': barectf_config.SignedIntegerFieldType, @@ -151,7 +165,7 @@ class _Parser(barectf_config_parse_common._Parser): # Creates an enumeration field type from the unsigned/signed # enumeration field type node `ft_node`. - def _create_enum_ft(self, ft_node): + def _create_enum_ft(self, ft_node: _MapNode) -> barectf_config._EnumerationFieldType: ft_type = { 'unsigned-enumeration': barectf_config.UnsignedEnumerationFieldType, 'signed-enumeration': barectf_config.SignedEnumerationFieldType, @@ -172,21 +186,24 @@ class _Parser(barectf_config_parse_common._Parser): mappings[label] = barectf_config.EnumerationFieldTypeMapping(ranges) - return self._create_common_int_ft(ft_node, ft_type, - barectf_config.EnumerationFieldTypeMappings(mappings)) + return typing.cast(barectf_config._EnumerationFieldType, + self._create_common_int_ft(ft_node, ft_type, + barectf_config.EnumerationFieldTypeMappings(mappings))) # Creates a real field type from the real field type node `ft_node`. - def _create_real_ft(self, ft_node): - return self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType, 8) + def _create_real_ft(self, ft_node: _MapNode) -> barectf_config.RealFieldType: + return typing.cast(barectf_config.RealFieldType, + self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType, + Alignment(8))) # Creates a string field type from the string field type node # `ft_node`. - def _create_string_ft(self, ft_node): + def _create_string_ft(self, ft_node: _MapNode) -> barectf_config.StringFieldType: return barectf_config.StringFieldType() # Creates a static array field type from the static array field type # node `ft_node`. - def _create_static_array_ft(self, ft_node): + def _create_static_array_ft(self, ft_node: _MapNode) -> barectf_config.StaticArrayFieldType: prop_name = 'element-field-type' try: @@ -201,9 +218,9 @@ class _Parser(barectf_config_parse_common._Parser): # # `prop_name` is the name of the property of which `members_node` is # the value. - def _create_struct_ft_members(self, members_node, prop_name): + def _create_struct_ft_members(self, members_node: List[_MapNode], prop_name: str): members = collections.OrderedDict() - member_names = set() + member_names: Set[str] = set() for member_node in members_node: member_name, member_node = list(member_node.items())[0] @@ -236,7 +253,7 @@ class _Parser(barectf_config_parse_common._Parser): # Creates a structure field type from the structure field type node # `ft_node`. - def _create_struct_ft(self, ft_node): + def _create_struct_ft(self, ft_node: _MapNode) -> barectf_config.StructureFieldType: minimum_alignment = self._alignment_prop(ft_node, 'minimum-alignment') if minimum_alignment is None: @@ -252,33 +269,43 @@ class _Parser(barectf_config_parse_common._Parser): return barectf_config.StructureFieldType(minimum_alignment, members) # Creates a field type from the field type node `ft_node`. - def _create_ft(self, ft_node): + def _create_ft(self, ft_node: _MapNode) -> barectf_config._FieldType: return self._ft_cls_name_to_create_method[ft_node['class']](ft_node) # Creates a field type from the field type node `parent_node[key]` # if it exists. - def _try_create_ft(self, parent_node, key): + def _try_create_ft(self, parent_node: _MapNode, key: str) -> _OptFt: if key not in parent_node: - return + return None try: return self._create_ft(parent_node[key]) except _ConfigurationParseError as exc: _append_error_ctx(exc, f'`{key}` property') + # satisfy static type checker (never reached) + raise + + # Like _try_create_ft(), but casts the result's type to + # `barectf_config.StructureFieldType` to satisfy static type + # checkers. + def _try_create_struct_ft(self, parent_node: _MapNode, key: str) -> _OptStructFt: + return typing.cast(barectf_config.StructureFieldType, + self._try_create_ft(parent_node, key)) + # Returns the total number of members in the structure field type # node `ft_node` if it exists, otherwise 0. @staticmethod - def _total_struct_ft_node_members(ft_node): + def _total_struct_ft_node_members(ft_node: Optional[_MapNode]) -> Count: if ft_node is None: - return 0 + return Count(0) members_node = ft_node.get('members') if members_node is None: - return 0 + return Count(0) - return len(members_node) + return Count(len(members_node)) # Creates an event type from the event type node `ev_type_node` # named `name`. @@ -288,26 +315,33 @@ class _Parser(barectf_config_parse_common._Parser): # stream type). For example, if the stream type has a event header # field type with `id` and `timestamp` members, then # `ev_member_count` is 2. - def _create_ev_type(self, name, ev_type_node, ev_member_count): + def _create_ev_type(self, name: str, ev_type_node: _MapNode, ev_member_count: Count) -> barectf_config.EventType: try: self._validate_iden(name, '`name` property', 'event type name') # make sure the event type is not empty spec_ctx_ft_prop_name = 'specific-context-field-type' payload_ft_prop_name = 'payload-field-type' - ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name)) - ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name)) + ev_member_count = Count(ev_member_count + + self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name))) + ev_member_count = Count(ev_member_count + + self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name))) if ev_member_count == 0: raise _ConfigurationParseError('Event type', 'Event type is empty (no members).') # create event type return barectf_config.EventType(name, ev_type_node.get('log-level'), - self._try_create_ft(ev_type_node, spec_ctx_ft_prop_name), - self._try_create_ft(ev_type_node, payload_ft_prop_name)) + self._try_create_struct_ft(ev_type_node, + spec_ctx_ft_prop_name), + self._try_create_struct_ft(ev_type_node, + payload_ft_prop_name)) except _ConfigurationParseError as exc: _append_error_ctx(exc, f'Event type `{name}`') + # satisfy static type checker (never reached) + raise + # Returns the effective feature field type for the field type # node `parent_node[key]`, if any. # @@ -324,7 +358,7 @@ class _Parser(barectf_config_parse_common._Parser): # # Otherwise: # A created field type. - def _feature_ft(self, parent_node, key, none=None): + def _feature_ft(self, parent_node: _MapNode, key: str, none: Any = None) -> Any: if key not in parent_node: # missing: default feature field type return none @@ -343,7 +377,7 @@ class _Parser(barectf_config_parse_common._Parser): assert type(ft_node) is collections.OrderedDict return self._create_ft(ft_node) - def _create_stream_type(self, name, stream_type_node): + def _create_stream_type(self, name: str, stream_type_node: _MapNode) -> barectf_config.StreamType: try: # validate stream type's name self._validate_iden(name, '`name` property', 'stream type name') @@ -416,9 +450,12 @@ class _Parser(barectf_config_parse_common._Parser): raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property', 'Event type ID field type feature is required because stream type has more than one event type') - if isinstance(ev_type_id_ft, barectf_config._FieldType) and ev_type_count > (1 << ev_type_id_ft.size): - raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property', - f'Field type\'s size ({ev_type_id_ft.size} bits) is too small to accomodate {ev_type_count} event types') + if isinstance(ev_type_id_ft, barectf_config._IntegerFieldType): + ev_type_id_int_ft = typing.cast(barectf_config._IntegerFieldType, ev_type_id_ft) + + if ev_type_count > (1 << ev_type_id_int_ft.size): + raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property', + f'Field type\'s size ({ev_type_id_int_ft.size} bits) is too small to accomodate {ev_type_count} event types') except _ConfigurationParseError as exc: exc._append_ctx('`event` property') _append_error_ctx(exc, '`$features` property') @@ -456,17 +493,18 @@ class _Parser(barectf_config_parse_common._Parser): f'Packet context field type member name `{member_name}` is reserved.') # create event types - ev_header_common_ctx_member_count = 0 + ev_header_common_ctx_member_count = Count(0) if ev_features.type_id_field_type is not None: - ev_header_common_ctx_member_count += 1 + ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1) if ev_features.time_field_type is not None: - ev_header_common_ctx_member_count += 1 + ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1) ev_common_ctx_ft_prop_name = 'event-common-context-field-type' ev_common_ctx_ft_node = stream_type_node.get(ev_common_ctx_ft_prop_name) - ev_header_common_ctx_member_count += self._total_struct_ft_node_members(ev_common_ctx_ft_node) + ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + + self._total_struct_ft_node_members(ev_common_ctx_ft_node)) ev_types = set() for ev_name, ev_type_node in stream_type_node[ev_types_prop_name].items(): @@ -475,12 +513,15 @@ class _Parser(barectf_config_parse_common._Parser): # create stream type return barectf_config.StreamType(name, ev_types, def_clk_type, features, pkt_ctx_ft_extra_members, - self._try_create_ft(stream_type_node, - ev_common_ctx_ft_prop_name)) + self._try_create_struct_ft(stream_type_node, + ev_common_ctx_ft_prop_name)) except _ConfigurationParseError as exc: _append_error_ctx(exc, f'Stream type `{name}`') - def _clk_type(self, name, prop_name): + # satisfy static type checker (never reached) + raise + + def _clk_type(self, name: str, prop_name: str) -> barectf_config.ClockType: clk_type = self._clk_types.get(name) if clk_type is None: @@ -489,7 +530,7 @@ class _Parser(barectf_config_parse_common._Parser): return clk_type - def _create_clk_type(self, name, clk_type_node): + def _create_clk_type(self, name: str, clk_type_node: _MapNode) -> barectf_config.ClockType: self._validate_iden(name, '`name` property', 'clock type name') clk_type_uuid = None uuid_node = clk_type_node.get('uuid') @@ -498,12 +539,12 @@ class _Parser(barectf_config_parse_common._Parser): clk_type_uuid = uuid.UUID(uuid_node) offset_seconds = 0 - offset_cycles = 0 + offset_cycles = Count(0) offset_node = clk_type_node.get('offset') if offset_node is not None: offset_seconds = offset_node.get('seconds', 0) - offset_cycles = offset_node.get('cycles', 0) + offset_cycles = offset_node.get('cycles', Count(0)) return barectf_config.ClockType(name, clk_type_node.get('frequency', int(1e9)), clk_type_uuid, clk_type_node.get('description'), @@ -583,7 +624,7 @@ class _Parser(barectf_config_parse_common._Parser): def _create_trace(self): try: trace_type = self._create_trace_type() - trace_node = self._root_node.config_node['trace'] + trace_node = self.config_node['trace'] env = None env_node = trace_node.get('environment') @@ -641,7 +682,9 @@ class _Parser(barectf_config_parse_common._Parser): # create options iden_prefix_def = False def_stream_type_name_def = False - opts_node = self._root_node.config_node.get('options') + opts_node = self.config_node.get('options') + iden_prefix = 'barectf_' + file_name_prefix = 'barectf' if opts_node is not None: code_gen_opts_node = opts_node.get('code-generation') @@ -686,14 +729,14 @@ class _Parser(barectf_config_parse_common._Parser): # * The `$field-type-aliases` property of the trace type node is # removed. def _expand_ft_aliases(self): - def resolve_ft_alias_from(parent_node, key): + def resolve_ft_alias_from(parent_node: _MapNode, key: str): if key not in parent_node: return if type(parent_node[key]) not in [collections.OrderedDict, str]: return - return self._resolve_ft_alias_from(ft_aliases_node, parent_node, key) + self._resolve_ft_alias_from(ft_aliases_node, parent_node, key) ft_aliases_node = self._trace_type_node['$field-type-aliases'] @@ -779,14 +822,14 @@ class _Parser(barectf_config_parse_common._Parser): # When this method returns, no field type node has an `$inherit` # property. def _apply_fts_inheritance(self): - def apply_ft_inheritance(parent_node, key): + def apply_ft_inheritance(parent_node: _MapNode, key: str): if key not in parent_node: return if type(parent_node[key]) is not collections.OrderedDict: return - return self._apply_ft_inheritance(parent_node, key) + self._apply_ft_inheritance(parent_node, key) features_prop_name = '$features' features_node = self._trace_type_node.get(features_prop_name) @@ -841,7 +884,7 @@ class _Parser(barectf_config_parse_common._Parser): # # This method normalizes form 1 to use form 2. def _normalize_struct_ft_member_nodes(self): - def normalize_members_node(members_node): + def normalize_members_node(members_node: List[_MapNode]): ft_prop_name = 'field-type' for member_node in members_node: @@ -854,7 +897,7 @@ class _Parser(barectf_config_parse_common._Parser): normalize_struct_ft_member_nodes(member_node[member_name], ft_prop_name) - def normalize_struct_ft_member_nodes(parent_node, key): + def normalize_struct_ft_member_nodes(parent_node: _MapNode, key: str): if type(parent_node) is not collections.OrderedDict: return @@ -863,6 +906,7 @@ class _Parser(barectf_config_parse_common._Parser): if type(ft_node) is not collections.OrderedDict: return + ft_node = typing.cast(collections.OrderedDict, ft_node) members_nodes = ft_node.get('members') if members_nodes is not None: @@ -919,7 +963,7 @@ class _Parser(barectf_config_parse_common._Parser): def _expand_fts(self): # Make sure that the current configuration node is valid # considering field types are not expanded yet. - self._schema_validator.validate(self._root_node.config_node, + self._schema_validator.validate(self.config_node, '3/config/config-pre-field-type-expansion') prop_name = '$field-type-aliases' @@ -950,7 +994,7 @@ class _Parser(barectf_config_parse_common._Parser): def _sub_log_level_aliases(self): # Make sure that the current configuration node is valid # considering log level aliases are not substituted yet. - self._schema_validator.validate(self._root_node.config_node, + self._schema_validator.validate(self.config_node, '3/config/config-pre-log-level-alias-sub') log_level_aliases_prop_name = '$log-level-aliases' @@ -990,7 +1034,7 @@ class _Parser(barectf_config_parse_common._Parser): # # It is safe to delete a yielded node during the iteration. @staticmethod - def _props(node): + def _props(node: Any) -> Iterable[Tuple[Any, str]]: if type(node) is collections.OrderedDict: for key in list(node): yield from _Parser._props(node[key]) @@ -999,8 +1043,8 @@ class _Parser(barectf_config_parse_common._Parser): for item_node in node: yield from _Parser._props(item_node) - def _trace_type_props(self): - yield from _Parser._props(self._root_node.config_node['trace']['type']) + def _trace_type_props(self) -> Iterable[Tuple[Any, str]]: + yield from _Parser._props(self.config_node['trace']['type']) # Normalize the properties of the configuration node. # @@ -1018,7 +1062,7 @@ class _Parser(barectf_config_parse_common._Parser): # This method also applies 1. to the trace node's `environment` # property. def _normalize_props(self): - def normalize_byte_order_prop(parent_node, key): + def normalize_byte_order_prop(parent_node: _MapNode, key: str): node = parent_node[key] if node in ['be', 'big']: @@ -1026,7 +1070,7 @@ class _Parser(barectf_config_parse_common._Parser): elif node in ['le', 'little']: parent_node[key] = 'little-endian' - trace_node = self._root_node.config_node['trace'] + trace_node = self.config_node['trace'] trace_type_node = trace_node['type'] prop_name = '$default-byte-order' @@ -1089,7 +1133,7 @@ class _Parser(barectf_config_parse_common._Parser): 'real', } - def set_ft_node_byte_order_prop(parent_node, key): + def set_ft_node_byte_order_prop(parent_node: _MapNode, key: str): if key not in parent_node: return @@ -1116,7 +1160,7 @@ class _Parser(barectf_config_parse_common._Parser): set_ft_node_byte_order_prop(ft_node, 'element-field-type') - def set_struct_ft_node_members_byte_order_prop(members_node): + def set_struct_ft_node_members_byte_order_prop(members_node: List[_MapNode]): for member_node in members_node: member_name, member_node = list(member_node.items())[0] @@ -1187,7 +1231,7 @@ class _Parser(barectf_config_parse_common._Parser): # Processes the inclusions of the event type node `ev_type_node`, # returning the effective node. - def _process_ev_type_node_include(self, ev_type_node): + def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode: # Make sure the event type node is valid for the inclusion # processing stage. self._schema_validator.validate(ev_type_node, '3/config/event-type-pre-include') @@ -1197,8 +1241,8 @@ class _Parser(barectf_config_parse_common._Parser): # Processes the inclusions of the stream type node # `stream_type_node`, returning the effective node. - def _process_stream_type_node_include(self, stream_type_node): - def process_children_include(stream_type_node): + def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode: + def process_children_include(stream_type_node: _MapNode): prop_name = 'event-types' if prop_name in stream_type_node: @@ -1217,7 +1261,7 @@ class _Parser(barectf_config_parse_common._Parser): # Processes the inclusions of the clock type node `clk_type_node`, # returning the effective node. - def _process_clk_type_node_include(self, clk_type_node): + def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode: # Make sure the clock type node is valid for the inclusion # processing stage. self._schema_validator.validate(clk_type_node, '3/config/clock-type-pre-include') @@ -1227,8 +1271,8 @@ class _Parser(barectf_config_parse_common._Parser): # Processes the inclusions of the trace type node `trace_type_node`, # returning the effective node. - def _process_trace_type_node_include(self, trace_type_node): - def process_children_include(trace_type_node): + def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode: + def process_children_include(trace_type_node: _MapNode): prop_name = 'clock-types' if prop_name in trace_type_node: @@ -1255,8 +1299,8 @@ class _Parser(barectf_config_parse_common._Parser): # Processes the inclusions of the trace node `trace_node`, returning # the effective node. - def _process_trace_node_include(self, trace_node): - def process_children_include(trace_node): + def _process_trace_node_include(self, trace_node: _MapNode) -> _MapNode: + def process_children_include(trace_node: _MapNode): prop_name = 'type' trace_node[prop_name] = self._process_trace_type_node_include(trace_node[prop_name]) @@ -1294,13 +1338,13 @@ class _Parser(barectf_config_parse_common._Parser): # # First, make sure the configuration node itself is valid for # the inclusion processing stage. - self._schema_validator.validate(self._root_node.config_node, '3/config/config-pre-include') + self._schema_validator.validate(self.config_node, '3/config/config-pre-include') # Process trace node inclusions. # # self._process_trace_node_include() returns a new (or the same) # trace node without any `$include` property in it, recursively. - self._root_node.config_node['trace'] = self._process_trace_node_include(self._root_node.config_node['trace']) + self.config_node['trace'] = self._process_trace_node_include(self.config_node['trace']) def _parse(self): # process configuration node inclusions @@ -1339,7 +1383,7 @@ class _Parser(barectf_config_parse_common._Parser): # At this point, the configuration node must be valid as an # effective configuration node. - self._schema_validator.validate(self._root_node.config_node, '3/config/config') + self._schema_validator.validate(self.config_node, '3/config/config') # Normalize properties. # @@ -1367,9 +1411,9 @@ class _Parser(barectf_config_parse_common._Parser): self._create_config() @property - def config(self): + def config(self) -> barectf_config.Configuration: return self._config @property - def config_node(self): - return self._root_node + def config_node(self) -> _MapNode: + return typing.cast(barectf_config_parse_common._ConfigNodeV3, self._root_node).config_node diff --git a/barectf/typing.py b/barectf/typing.py new file mode 100644 index 0000000..36445db --- /dev/null +++ b/barectf/typing.py @@ -0,0 +1,31 @@ +# The MIT License (MIT) +# +# Copyright (c) 2020 Philippe Proulx +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import typing + +Index = typing.NewType('Index', int) +Count = typing.NewType('Count', int) +Id = typing.NewType('Id', int) +Alignment = typing.NewType('Alignment', int) +VersionNumber = typing.NewType('VersionNumber', int) +_OptStr = typing.Optional[str] -- 2.34.1