2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26 _BT_TESTS_DATADIR
= os
.environ
['BT_TESTS_DATADIR']
27 _BT_CTF_TRACES_PATH
= os
.environ
['BT_CTF_TRACES_PATH']
28 _3EVENTS_INTERSECT_TRACE_PATH
= os
.path
.join(
29 _BT_CTF_TRACES_PATH
, 'intersection', '3eventsintersect'
31 _NOINTERSECT_TRACE_PATH
= os
.path
.join(
32 _BT_CTF_TRACES_PATH
, 'intersection', 'nointersect'
34 _SEQUENCE_TRACE_PATH
= os
.path
.join(_BT_CTF_TRACES_PATH
, 'succeed', 'sequence')
35 _AUTO_SOURCE_DISCOVERY_GROUPING_PATH
= os
.path
.join(
36 _BT_TESTS_DATADIR
, 'auto-source-discovery', 'grouping'
38 _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
= os
.path
.join(
39 _BT_TESTS_DATADIR
, 'auto-source-discovery', 'params-log-level'
43 class ComponentSpecTestCase(unittest
.TestCase
):
44 def test_create_good_no_params(self
):
45 bt2
.ComponentSpec('plugin', 'compcls')
47 def test_create_good_with_params(self
):
48 bt2
.ComponentSpec('plugin', 'compcls', {'salut': 23})
50 def test_create_good_with_path_params(self
):
51 spec
= bt2
.ComponentSpec('plugin', 'compcls', 'a path')
52 self
.assertEqual(spec
.params
['inputs'], ['a path'])
54 def test_create_wrong_plugin_name_type(self
):
55 with self
.assertRaises(TypeError):
56 bt2
.ComponentSpec(23, 'compcls')
58 def test_create_wrong_component_class_name_type(self
):
59 with self
.assertRaises(TypeError):
60 bt2
.ComponentSpec('plugin', 190)
62 def test_create_wrong_params_type(self
):
63 with self
.assertRaises(TypeError):
64 bt2
.ComponentSpec('dwdw', 'compcls', datetime
.datetime
.now())
67 # Return a map, msg type -> number of messages of this type.
70 def _count_msgs_by_type(msgs
):
81 class TraceCollectionMessageIteratorTestCase(unittest
.TestCase
):
82 def test_create_wrong_stream_intersection_mode_type(self
):
83 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
85 with self
.assertRaises(TypeError):
86 bt2
.TraceCollectionMessageIterator(specs
, stream_intersection_mode
=23)
88 def test_create_wrong_begin_type(self
):
89 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
91 with self
.assertRaises(TypeError):
92 bt2
.TraceCollectionMessageIterator(specs
, begin
='hi')
94 def test_create_wrong_end_type(self
):
95 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
97 with self
.assertRaises(TypeError):
98 bt2
.TraceCollectionMessageIterator(specs
, begin
='lel')
100 def test_create_no_such_plugin(self
):
101 specs
= [bt2
.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH
)]
103 with self
.assertRaises(ValueError):
104 bt2
.TraceCollectionMessageIterator(specs
)
106 def test_create_begin_s(self
):
107 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
108 bt2
.TraceCollectionMessageIterator(specs
, begin
=19457.918232)
110 def test_create_end_s(self
):
111 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
112 bt2
.TraceCollectionMessageIterator(specs
, end
=123.12312)
114 def test_create_begin_datetime(self
):
115 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
116 bt2
.TraceCollectionMessageIterator(specs
, begin
=datetime
.datetime
.now())
118 def test_create_end_datetime(self
):
119 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
120 bt2
.TraceCollectionMessageIterator(specs
, end
=datetime
.datetime
.now())
122 def test_iter_no_intersection(self
):
123 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
124 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
125 msgs
= list(msg_iter
)
126 self
.assertEqual(len(msgs
), 28)
127 hist
= _count_msgs_by_type(msgs
)
128 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
130 # Same as the above, but we pass a single spec instead of a spec list.
131 def test_iter_specs_not_list(self
):
132 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
133 msg_iter
= bt2
.TraceCollectionMessageIterator(spec
)
134 msgs
= list(msg_iter
)
135 self
.assertEqual(len(msgs
), 28)
136 hist
= _count_msgs_by_type(msgs
)
137 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
139 def test_iter_custom_filter(self
):
140 src_spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
141 flt_spec
= bt2
.ComponentSpec('utils', 'trimmer', {'end': '13515309.000000075'})
142 msg_iter
= bt2
.TraceCollectionMessageIterator(src_spec
, flt_spec
)
143 hist
= _count_msgs_by_type(msg_iter
)
144 self
.assertEqual(hist
[bt2
._EventMessage
], 5)
146 def test_iter_intersection(self
):
147 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
148 msg_iter
= bt2
.TraceCollectionMessageIterator(
149 specs
, stream_intersection_mode
=True
151 msgs
= list(msg_iter
)
152 self
.assertEqual(len(msgs
), 15)
153 hist
= _count_msgs_by_type(msgs
)
154 self
.assertEqual(hist
[bt2
._EventMessage
], 3)
156 def test_iter_intersection_no_inputs_param(self
):
157 specs
= [bt2
.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
159 with self
.assertRaises(ValueError):
160 bt2
.TraceCollectionMessageIterator(specs
, stream_intersection_mode
=True)
162 def test_iter_no_intersection_two_traces(self
):
163 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
165 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
166 msgs
= list(msg_iter
)
167 self
.assertEqual(len(msgs
), 56)
168 hist
= _count_msgs_by_type(msgs
)
169 self
.assertEqual(hist
[bt2
._EventMessage
], 16)
171 def test_iter_no_intersection_begin(self
):
172 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
173 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
, begin
=13515309.000000023)
174 hist
= _count_msgs_by_type(msg_iter
)
175 self
.assertEqual(hist
[bt2
._EventMessage
], 6)
177 def test_iter_no_intersection_end(self
):
178 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
179 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
, end
=13515309.000000075)
180 hist
= _count_msgs_by_type(msg_iter
)
181 self
.assertEqual(hist
[bt2
._EventMessage
], 5)
183 def test_iter_auto_source_component_spec(self
):
184 specs
= [bt2
.AutoSourceComponentSpec(_3EVENTS_INTERSECT_TRACE_PATH
)]
185 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
186 msgs
= list(msg_iter
)
187 self
.assertEqual(len(msgs
), 28)
188 hist
= _count_msgs_by_type(msgs
)
189 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
191 def test_iter_auto_source_component_spec_list_of_strings(self
):
192 msg_iter
= bt2
.TraceCollectionMessageIterator([_3EVENTS_INTERSECT_TRACE_PATH
])
193 msgs
= list(msg_iter
)
194 self
.assertEqual(len(msgs
), 28)
195 hist
= _count_msgs_by_type(msgs
)
196 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
198 def test_iter_auto_source_component_spec_string(self
):
199 msg_iter
= bt2
.TraceCollectionMessageIterator(_3EVENTS_INTERSECT_TRACE_PATH
)
200 msgs
= list(msg_iter
)
201 self
.assertEqual(len(msgs
), 28)
202 hist
= _count_msgs_by_type(msgs
)
203 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
205 def test_iter_mixed_inputs(self
):
206 msg_iter
= bt2
.TraceCollectionMessageIterator(
208 _3EVENTS_INTERSECT_TRACE_PATH
,
209 bt2
.AutoSourceComponentSpec(_SEQUENCE_TRACE_PATH
),
210 bt2
.ComponentSpec('ctf', 'fs', _NOINTERSECT_TRACE_PATH
),
213 msgs
= list(msg_iter
)
214 self
.assertEqual(len(msgs
), 76)
215 hist
= _count_msgs_by_type(msgs
)
216 self
.assertEqual(hist
[bt2
._EventMessage
], 24)
219 class _TestAutoDiscoverSourceComponentSpecs(unittest
.TestCase
):
221 self
._saved
_babeltrace
_plugin
_path
= os
.environ
['BABELTRACE_PLUGIN_PATH']
222 os
.environ
['BABELTRACE_PLUGIN_PATH'] += os
.pathsep
+ self
._plugin
_path
225 os
.environ
['BABELTRACE_PLUGIN_PATH'] = self
._saved
_babeltrace
_plugin
_path
228 class TestAutoDiscoverSourceComponentSpecsGrouping(
229 _TestAutoDiscoverSourceComponentSpecs
231 _plugin_path
= _AUTO_SOURCE_DISCOVERY_GROUPING_PATH
233 def test_grouping(self
):
235 bt2
.AutoSourceComponentSpec('ABCDE'),
236 bt2
.AutoSourceComponentSpec(_AUTO_SOURCE_DISCOVERY_GROUPING_PATH
),
237 bt2
.AutoSourceComponentSpec('does-not-exist'),
239 it
= bt2
.TraceCollectionMessageIterator(specs
)
240 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
242 self
.assertEqual(len(msgs
), 8)
244 self
.assertEqual(msgs
[0].stream
.name
, 'TestSourceABCDE: ABCDE')
245 self
.assertEqual(msgs
[1].stream
.name
, 'TestSourceExt: aaa1, aaa2, aaa3')
246 self
.assertEqual(msgs
[2].stream
.name
, 'TestSourceExt: bbb1, bbb2')
247 self
.assertEqual(msgs
[3].stream
.name
, 'TestSourceExt: ccc1')
248 self
.assertEqual(msgs
[4].stream
.name
, 'TestSourceExt: ccc2')
249 self
.assertEqual(msgs
[5].stream
.name
, 'TestSourceExt: ccc3')
250 self
.assertEqual(msgs
[6].stream
.name
, 'TestSourceExt: ccc4')
251 self
.assertEqual(msgs
[7].stream
.name
, 'TestSourceSomeDir: some-dir')
254 class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
255 _TestAutoDiscoverSourceComponentSpecs
257 _plugin_path
= _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
259 _dir_a
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-a')
260 _dir_b
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-b')
261 _dir_ab
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-ab')
263 def _test_two_comps_from_one_spec(self
, params
, obj
=None, logging_level
=None):
265 bt2
.AutoSourceComponentSpec(
266 self
._dir
_ab
, params
=params
, obj
=obj
, logging_level
=logging_level
269 it
= bt2
.TraceCollectionMessageIterator(specs
)
270 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
272 self
.assertEqual(len(msgs
), 2)
276 def test_params_two_comps_from_one_spec(self
):
277 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
278 params
={'test-allo': 'madame', 'what': 'test-params'}
281 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: ('test-allo', 'madame')")
282 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: ('test-allo', 'madame')")
284 def test_obj_two_comps_from_one_spec(self
):
285 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
286 params
={'what': 'python-obj'}, obj
='deore'
289 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
290 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
292 def test_log_level_two_comps_from_one_spec(self
):
293 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
294 params
={'what': 'log-level'}, logging_level
=bt2
.LoggingLevel
.DEBUG
298 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.DEBUG
)
301 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.DEBUG
)
304 def _test_two_comps_from_two_specs(
310 logging_level_a
=None,
311 logging_level_b
=None,
314 bt2
.AutoSourceComponentSpec(
315 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
317 bt2
.AutoSourceComponentSpec(
318 self
._dir
_b
, params
=params_b
, obj
=obj_b
, logging_level
=logging_level_b
321 it
= bt2
.TraceCollectionMessageIterator(specs
)
322 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
324 self
.assertEqual(len(msgs
), 2)
328 def test_params_two_comps_from_two_specs(self
):
329 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
330 params_a
={'test-allo': 'madame', 'what': 'test-params'},
331 params_b
={'test-bonjour': 'monsieur', 'what': 'test-params'},
334 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: ('test-allo', 'madame')")
336 msgs
[1].stream
.name
, "TestSourceB: ('test-bonjour', 'monsieur')"
339 def test_obj_two_comps_from_two_specs(self
):
340 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
341 params_a
={'what': 'python-obj'},
342 params_b
={'what': 'python-obj'},
347 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
348 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: alivio")
350 def test_log_level_two_comps_from_two_specs(self
):
351 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
352 params_a
={'what': 'log-level'},
353 params_b
={'what': 'log-level'},
354 logging_level_a
=bt2
.LoggingLevel
.DEBUG
,
355 logging_level_b
=bt2
.LoggingLevel
.TRACE
,
359 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.DEBUG
)
362 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.TRACE
)
365 def _test_one_comp_from_one_spec_one_comp_from_both_1(
371 logging_level_a
=None,
372 logging_level_ab
=None,
375 bt2
.AutoSourceComponentSpec(
376 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
378 bt2
.AutoSourceComponentSpec(
382 logging_level
=logging_level_ab
,
385 it
= bt2
.TraceCollectionMessageIterator(specs
)
386 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
388 self
.assertEqual(len(msgs
), 2)
392 def test_params_one_comp_from_one_spec_one_comp_from_both_1(self
):
393 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
394 params_a
={'test-allo': 'madame', 'what': 'test-params'},
395 params_ab
={'test-bonjour': 'monsieur', 'what': 'test-params'},
400 "TestSourceA: ('test-allo', 'madame'), ('test-bonjour', 'monsieur')",
403 msgs
[1].stream
.name
, "TestSourceB: ('test-bonjour', 'monsieur')"
406 def test_obj_one_comp_from_one_spec_one_comp_from_both_1(self
):
407 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
408 params_a
={'what': 'python-obj'},
409 params_ab
={'what': 'python-obj'},
414 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: alivio")
415 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: alivio")
417 def test_log_level_one_comp_from_one_spec_one_comp_from_both_1(self
):
418 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
419 params_a
={'what': 'log-level'},
420 params_ab
={'what': 'log-level'},
421 logging_level_a
=bt2
.LoggingLevel
.DEBUG
,
422 logging_level_ab
=bt2
.LoggingLevel
.TRACE
,
426 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.TRACE
)
429 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.TRACE
)
432 def _test_one_comp_from_one_spec_one_comp_from_both_2(
438 logging_level_ab
=None,
439 logging_level_a
=None,
442 bt2
.AutoSourceComponentSpec(
446 logging_level
=logging_level_ab
,
448 bt2
.AutoSourceComponentSpec(
449 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
452 it
= bt2
.TraceCollectionMessageIterator(specs
)
453 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
455 self
.assertEqual(len(msgs
), 2)
459 def test_params_one_comp_from_one_spec_one_comp_from_both_2(self
):
460 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
462 'test-bonjour': 'madame',
463 'test-salut': 'les amis',
464 'what': 'test-params',
466 params_a
={'test-bonjour': 'monsieur', 'what': 'test-params'},
471 "TestSourceA: ('test-bonjour', 'monsieur'), ('test-salut', 'les amis')",
475 "TestSourceB: ('test-bonjour', 'madame'), ('test-salut', 'les amis')",
478 def test_obj_one_comp_from_one_spec_one_comp_from_both_2(self
):
479 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
480 params_ab
={'what': 'python-obj'},
481 params_a
={'what': 'python-obj'},
486 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: alivio")
487 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
489 def test_log_level_one_comp_from_one_spec_one_comp_from_both_2(self
):
490 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
491 params_ab
={'what': 'log-level'},
492 params_a
={'what': 'log-level'},
493 logging_level_ab
=bt2
.LoggingLevel
.DEBUG
,
494 logging_level_a
=bt2
.LoggingLevel
.TRACE
,
498 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.TRACE
)
501 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.DEBUG
)
504 def test_obj_override_with_none(self
):
506 bt2
.AutoSourceComponentSpec(
507 self
._dir
_ab
, params
={'what': 'python-obj'}, obj
='deore'
509 bt2
.AutoSourceComponentSpec(
510 self
._dir
_a
, params
={'what': 'python-obj'}, obj
=None
513 it
= bt2
.TraceCollectionMessageIterator(specs
)
514 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
516 self
.assertEqual(len(msgs
), 2)
517 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: None")
518 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
520 def test_obj_no_override_with_no_obj(self
):
522 bt2
.AutoSourceComponentSpec(
523 self
._dir
_ab
, params
={'what': 'python-obj'}, obj
='deore'
525 bt2
.AutoSourceComponentSpec(self
._dir
_a
, params
={'what': 'python-obj'}),
527 it
= bt2
.TraceCollectionMessageIterator(specs
)
528 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
530 self
.assertEqual(len(msgs
), 2)
531 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
532 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
535 if __name__
== '__main__':