Commit | Line | Data |
---|---|---|
85dcce24 PP |
1 | # The MIT License (MIT) |
2 | # | |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
4 | # | |
5 | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
6 | # of this software and associated documentation files (the "Software"), to deal | |
7 | # in the Software without restriction, including without limitation the rights | |
8 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
9 | # copies of the Software, and to permit persons to whom the Software is | |
10 | # furnished to do so, subject to the following conditions: | |
11 | # | |
12 | # The above copyright notice and this permission notice shall be included in | |
13 | # all copies or substantial portions of the Software. | |
14 | # | |
15 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
16 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
17 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
18 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
19 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
21 | # THE SOFTWARE. | |
22 | ||
23 | from bt2 import utils | |
24 | import bt2 | |
3d60267b | 25 | import itertools |
5602ef81 | 26 | import bt2.message_iterator |
85dcce24 | 27 | import datetime |
85dcce24 PP |
28 | from collections import namedtuple |
29 | import numbers | |
30 | ||
31 | ||
3d60267b PP |
32 | # a pair of component and ComponentSpec |
33 | _ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) | |
85dcce24 PP |
34 | |
35 | ||
3d60267b | 36 | class ComponentSpec: |
85dcce24 PP |
37 | def __init__(self, plugin_name, component_class_name, params=None): |
38 | utils._check_str(plugin_name) | |
39 | utils._check_str(component_class_name) | |
40 | self._plugin_name = plugin_name | |
41 | self._component_class_name = component_class_name | |
42 | ||
43 | if type(params) is str: | |
907f2b70 | 44 | self._params = bt2.create_value({'paths': [params]}) |
85dcce24 PP |
45 | else: |
46 | self._params = bt2.create_value(params) | |
47 | ||
48 | @property | |
49 | def plugin_name(self): | |
50 | return self._plugin_name | |
51 | ||
52 | @property | |
53 | def component_class_name(self): | |
54 | return self._component_class_name | |
55 | ||
56 | @property | |
57 | def params(self): | |
58 | return self._params | |
59 | ||
60 | ||
61 | # datetime.datetime or integral to nanoseconds | |
62 | def _get_ns(obj): | |
63 | if obj is None: | |
64 | return | |
65 | ||
66 | if isinstance(obj, numbers.Real): | |
67 | # consider that it's already in seconds | |
68 | s = obj | |
69 | elif isinstance(obj, datetime.datetime): | |
70 | # s -> ns | |
71 | s = obj.timestamp() | |
72 | else: | |
73 | raise TypeError('"{}" is not an integral number or a datetime.datetime object'.format(obj)) | |
74 | ||
75 | return int(s * 1e9) | |
76 | ||
77 | ||
3d60267b PP |
78 | class _CompClsType: |
79 | SOURCE = 0 | |
80 | FILTER = 1 | |
81 | ||
82 | ||
5602ef81 | 83 | class TraceCollectionMessageIterator(bt2.message_iterator._MessageIterator): |
3d60267b | 84 | def __init__(self, source_component_specs, filter_component_specs=None, |
907f2b70 | 85 | stream_intersection_mode=False, begin=None, end=None): |
85dcce24 PP |
86 | utils._check_bool(stream_intersection_mode) |
87 | self._stream_intersection_mode = stream_intersection_mode | |
88 | self._begin_ns = _get_ns(begin) | |
89 | self._end_ns = _get_ns(end) | |
3d60267b PP |
90 | |
91 | if type(source_component_specs) is ComponentSpec: | |
92 | source_component_specs = [source_component_specs] | |
93 | ||
94 | if type(filter_component_specs) is ComponentSpec: | |
95 | filter_component_specs = [filter_component_specs] | |
96 | elif filter_component_specs is None: | |
97 | filter_component_specs = [] | |
98 | ||
85dcce24 | 99 | self._src_comp_specs = source_component_specs |
3d60267b | 100 | self._flt_comp_specs = filter_component_specs |
85dcce24 PP |
101 | self._next_suffix = 1 |
102 | self._connect_ports = False | |
103 | ||
3d60267b | 104 | # lists of _ComponentAndSpec |
85dcce24 | 105 | self._src_comps_and_specs = [] |
3d60267b | 106 | self._flt_comps_and_specs = [] |
85dcce24 | 107 | |
3d60267b PP |
108 | self._validate_component_specs(source_component_specs) |
109 | self._validate_component_specs(filter_component_specs) | |
85dcce24 PP |
110 | self._build_graph() |
111 | ||
3d60267b PP |
112 | def _validate_component_specs(self, comp_specs): |
113 | for comp_spec in comp_specs: | |
114 | if type(comp_spec) is not ComponentSpec: | |
115 | raise TypeError('"{}" object is not a ComponentSpec'.format(type(comp_spec))) | |
85dcce24 PP |
116 | |
117 | def __next__(self): | |
5602ef81 | 118 | return next(self._msg_iter) |
85dcce24 | 119 | |
907f2b70 | 120 | def _create_stream_intersection_trimmer(self, component, port): |
85dcce24 PP |
121 | # find the original parameters specified by the user to create |
122 | # this port's component to get the `path` parameter | |
123 | for src_comp_and_spec in self._src_comps_and_specs: | |
907f2b70 | 124 | if component == src_comp_and_spec.comp: |
85dcce24 PP |
125 | break |
126 | ||
127 | try: | |
907f2b70 SM |
128 | paths = src_comp_and_spec.spec.params['paths'] |
129 | except Exception as e: | |
130 | raise bt2.Error('all source components must be created with a "paths" parameter in stream intersection mode') from e | |
85dcce24 | 131 | |
907f2b70 | 132 | params = {'paths': paths} |
85dcce24 PP |
133 | |
134 | # query the port's component for the `trace-info` object which | |
135 | # contains the stream intersection range for each exposed | |
136 | # trace | |
137 | query_exec = bt2.QueryExecutor() | |
907f2b70 | 138 | trace_info_res = query_exec.query(src_comp_and_spec.comp.component_class, |
85dcce24 PP |
139 | 'trace-info', params) |
140 | begin = None | |
141 | end = None | |
142 | ||
a38d7650 | 143 | # find the trace info for this port's trace |
87b768aa PP |
144 | try: |
145 | for trace_info in trace_info_res: | |
a38d7650 SM |
146 | for stream in trace_info['streams']: |
147 | if stream['port-name'] == port.name: | |
148 | range_ns = trace_info['intersection-range-ns'] | |
149 | begin = range_ns['begin'] | |
150 | end = range_ns['end'] | |
151 | break | |
907f2b70 | 152 | except Exception: |
87b768aa | 153 | pass |
85dcce24 PP |
154 | |
155 | if begin is None or end is None: | |
156 | raise bt2.Error('cannot find stream intersection range for port "{}"'.format(port.name)) | |
157 | ||
907f2b70 | 158 | name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) |
85dcce24 PP |
159 | return self._create_trimmer(begin, end, name) |
160 | ||
161 | def _create_muxer(self): | |
162 | plugin = bt2.find_plugin('utils') | |
163 | ||
164 | if plugin is None: | |
165 | raise bt2.Error('cannot find "utils" plugin (needed for the muxer)') | |
166 | ||
167 | if 'muxer' not in plugin.filter_component_classes: | |
168 | raise bt2.Error('cannot find "muxer" filter component class in "utils" plugin') | |
169 | ||
170 | comp_cls = plugin.filter_component_classes['muxer'] | |
171 | return self._graph.add_component(comp_cls, 'muxer') | |
172 | ||
907f2b70 | 173 | def _create_trimmer(self, begin_ns, end_ns, name): |
85dcce24 PP |
174 | plugin = bt2.find_plugin('utils') |
175 | ||
176 | if plugin is None: | |
177 | raise bt2.Error('cannot find "utils" plugin (needed for the trimmer)') | |
178 | ||
179 | if 'trimmer' not in plugin.filter_component_classes: | |
180 | raise bt2.Error('cannot find "trimmer" filter component class in "utils" plugin') | |
181 | ||
182 | params = {} | |
183 | ||
907f2b70 SM |
184 | def ns_to_string(ns): |
185 | s_part = ns // 1000000000 | |
186 | ns_part = ns % 1000000000 | |
187 | return '{}.{:09d}'.format(s_part, ns_part) | |
85dcce24 | 188 | |
907f2b70 SM |
189 | if begin_ns is not None: |
190 | params['begin'] = ns_to_string(begin_ns) | |
191 | ||
192 | if end_ns is not None: | |
193 | params['end'] = ns_to_string(end_ns) | |
85dcce24 PP |
194 | |
195 | comp_cls = plugin.filter_component_classes['trimmer'] | |
196 | return self._graph.add_component(comp_cls, name, params) | |
197 | ||
3d60267b | 198 | def _get_unique_comp_name(self, comp_spec): |
85dcce24 PP |
199 | name = '{}-{}'.format(comp_spec.plugin_name, |
200 | comp_spec.component_class_name) | |
3d60267b PP |
201 | comps_and_specs = itertools.chain(self._src_comps_and_specs, |
202 | self._flt_comps_and_specs) | |
85dcce24 | 203 | |
3d60267b | 204 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
85dcce24 PP |
205 | name += '-{}'.format(self._next_suffix) |
206 | self._next_suffix += 1 | |
207 | ||
208 | return name | |
209 | ||
3d60267b | 210 | def _create_comp(self, comp_spec, comp_cls_type): |
85dcce24 PP |
211 | plugin = bt2.find_plugin(comp_spec.plugin_name) |
212 | ||
213 | if plugin is None: | |
214 | raise bt2.Error('no such plugin: {}'.format(comp_spec.plugin_name)) | |
215 | ||
3d60267b PP |
216 | if comp_cls_type == _CompClsType.SOURCE: |
217 | comp_classes = plugin.source_component_classes | |
218 | else: | |
219 | comp_classes = plugin.filter_component_classes | |
220 | ||
221 | if comp_spec.component_class_name not in comp_classes: | |
222 | cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter' | |
223 | raise bt2.Error('no such {} component class in "{}" plugin: {}'.format(cc_type, | |
224 | comp_spec.plugin_name, | |
225 | comp_spec.component_class_name)) | |
85dcce24 | 226 | |
3d60267b PP |
227 | comp_cls = comp_classes[comp_spec.component_class_name] |
228 | name = self._get_unique_comp_name(comp_spec) | |
85dcce24 PP |
229 | comp = self._graph.add_component(comp_cls, name, comp_spec.params) |
230 | return comp | |
231 | ||
232 | def _get_free_muxer_input_port(self): | |
233 | for port in self._muxer_comp.input_ports.values(): | |
234 | if not port.is_connected: | |
235 | return port | |
236 | ||
907f2b70 | 237 | def _connect_src_comp_port(self, component, port): |
85dcce24 PP |
238 | # if this trace collection iterator is in stream intersection |
239 | # mode, we need this connection: | |
240 | # | |
241 | # port -> trimmer -> muxer | |
242 | # | |
243 | # otherwise, simply: | |
244 | # | |
245 | # port -> muxer | |
246 | if self._stream_intersection_mode: | |
907f2b70 | 247 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
85dcce24 PP |
248 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) |
249 | port_to_muxer = trimmer_comp.output_ports['out'] | |
250 | else: | |
251 | port_to_muxer = port | |
252 | ||
253 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
254 | ||
907f2b70 | 255 | def _graph_port_added(self, component, port): |
85dcce24 PP |
256 | if not self._connect_ports: |
257 | return | |
258 | ||
907f2b70 | 259 | if type(port) is bt2.port._InputPort: |
85dcce24 PP |
260 | return |
261 | ||
907f2b70 | 262 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
85dcce24 PP |
263 | # do not care about non-source components (muxer, trimmer, etc.) |
264 | return | |
265 | ||
907f2b70 | 266 | self._connect_src_comp_port(component, port) |
85dcce24 PP |
267 | |
268 | def _build_graph(self): | |
269 | self._graph = bt2.Graph() | |
907f2b70 | 270 | self._graph.add_port_added_listener(self._graph_port_added) |
85dcce24 PP |
271 | self._muxer_comp = self._create_muxer() |
272 | ||
273 | if self._begin_ns is not None or self._end_ns is not None: | |
274 | trimmer_comp = self._create_trimmer(self._begin_ns, | |
275 | self._end_ns, 'trimmer') | |
276 | self._graph.connect_ports(self._muxer_comp.output_ports['out'], | |
277 | trimmer_comp.input_ports['in']) | |
5602ef81 | 278 | msg_iter_port = trimmer_comp.output_ports['out'] |
85dcce24 | 279 | else: |
5602ef81 | 280 | msg_iter_port = self._muxer_comp.output_ports['out'] |
85dcce24 | 281 | |
3d60267b PP |
282 | # create extra filter components (chained) |
283 | for comp_spec in self._flt_comp_specs: | |
284 | comp = self._create_comp(comp_spec, _CompClsType.FILTER) | |
285 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
286 | ||
287 | # connect the extra filter chain | |
288 | for comp_and_spec in self._flt_comps_and_specs: | |
289 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
290 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
5602ef81 SM |
291 | self._graph.connect_ports(msg_iter_port, in_port) |
292 | msg_iter_port = out_port | |
3d60267b | 293 | |
85dcce24 PP |
294 | # Here we create the components, self._graph_port_added() is |
295 | # called when they add ports, but the callback returns early | |
296 | # because self._connect_ports is False. This is because the | |
297 | # self._graph_port_added() could not find the associated source | |
298 | # component specification in self._src_comps_and_specs because | |
299 | # it does not exist yet (it needs the created component to | |
300 | # exist). | |
301 | for comp_spec in self._src_comp_specs: | |
3d60267b PP |
302 | comp = self._create_comp(comp_spec, _CompClsType.SOURCE) |
303 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
85dcce24 PP |
304 | |
305 | # Now we connect the ports which exist at this point. We allow | |
306 | # self._graph_port_added() to automatically connect _new_ ports. | |
307 | self._connect_ports = True | |
308 | ||
309 | for comp_and_spec in self._src_comps_and_specs: | |
310 | # Keep a separate list because comp_and_spec.output_ports | |
311 | # could change during the connection of one of its ports. | |
312 | # Any new port is handled by self._graph_port_added(). | |
313 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
314 | ||
315 | for out_port in out_ports: | |
907f2b70 | 316 | if out_port.is_connected: |
85dcce24 PP |
317 | continue |
318 | ||
907f2b70 | 319 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
85dcce24 | 320 | |
5602ef81 | 321 | # create this trace collection iterator's message iterator |
907f2b70 | 322 | self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port) |