10 _TEST_CTF_TRACES_PATH
= os
.environ
['TEST_CTF_TRACES_PATH']
11 _3EVENTS_INTERSECT_TRACE_PATH
= os
.path
.join(_TEST_CTF_TRACES_PATH
,
16 class SourceComponentSpecTestCase(unittest
.TestCase
):
17 def test_create_good_no_params(self
):
18 spec
= bt2
.SourceComponentSpec('plugin', 'compcls')
20 def test_create_good_with_params(self
):
21 spec
= bt2
.SourceComponentSpec('plugin', 'compcls', {'salut': 23})
23 def test_create_good_with_path_params(self
):
24 spec
= bt2
.SourceComponentSpec('plugin', 'compcls', 'a path')
25 self
.assertEqual(spec
.params
['path'], 'a path')
27 def test_create_wrong_plugin_name_type(self
):
28 with self
.assertRaises(TypeError):
29 spec
= bt2
.SourceComponentSpec(23, 'compcls')
31 def test_create_wrong_component_class_name_type(self
):
32 with self
.assertRaises(TypeError):
33 spec
= bt2
.SourceComponentSpec('plugin', 190)
35 def test_create_wrong_params_type(self
):
36 with self
.assertRaises(TypeError):
37 spec
= bt2
.SourceComponentSpec('dwdw', 'compcls', datetime
.datetime
.now())
40 class TraceCollectionNotificationIteratorTestCase(unittest
.TestCase
):
41 def test_create_wrong_stream_intersection_mode_type(self
):
42 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
44 with self
.assertRaises(TypeError):
45 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=23)
47 def test_create_wrong_begin_type(self
):
48 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
50 with self
.assertRaises(TypeError):
51 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
='hi')
53 def test_create_wrong_end_type(self
):
54 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
56 with self
.assertRaises(TypeError):
57 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
='lel')
59 def test_create_no_such_plugin(self
):
60 specs
= [bt2
.SourceComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH
)]
62 with self
.assertRaises(bt2
.Error
):
63 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
65 def test_create_begin_s(self
):
66 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
67 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
=19457.918232)
69 def test_create_end_s(self
):
70 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
71 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, end
=123.12312)
73 def test_create_begin_datetime(self
):
74 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
75 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
=datetime
.datetime
.now())
77 def test_create_end_datetime(self
):
78 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
79 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, end
=datetime
.datetime
.now())
81 def test_iter_no_intersection(self
):
82 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
83 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
84 self
.assertEqual(len(list(notif_iter
)), 28)
86 def test_iter_no_intersection_subscribe(self
):
87 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
88 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
89 notification_types
=[bt2
.EventNotification
])
90 self
.assertEqual(len(list(notif_iter
)), 8)
92 def test_iter_intersection(self
):
93 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
94 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True)
95 self
.assertEqual(len(list(notif_iter
)), 15)
97 def test_iter_intersection_subscribe(self
):
98 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
99 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True,
100 notification_types
=[bt2
.EventNotification
])
101 self
.assertEqual(len(list(notif_iter
)), 3)
103 def test_iter_intersection_no_path_param(self
):
104 specs
= [bt2
.SourceComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
106 with self
.assertRaises(bt2
.Error
):
107 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True,
108 notification_types
=[bt2
.EventNotification
])
110 def test_iter_no_intersection_two_traces(self
):
111 spec
= bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
113 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
114 self
.assertEqual(len(list(notif_iter
)), 56)
116 def test_iter_no_intersection_begin(self
):
117 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
118 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
119 notification_types
=[bt2
.EventNotification
],
120 begin
=13515309.000000023)
121 self
.assertEqual(len(list(notif_iter
)), 6)
123 def test_iter_no_intersection_end(self
):
124 specs
= [bt2
.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
125 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
126 notification_types
=[bt2
.EventNotification
],
127 end
=13515309.000000075)
128 self
.assertEqual(len(list(notif_iter
)), 5)