10 _TEST_CTF_TRACES_PATH
= os
.environ
['TEST_CTF_TRACES_PATH']
11 _3EVENTS_INTERSECT_TRACE_PATH
= os
.path
.join(_TEST_CTF_TRACES_PATH
,
16 @unittest.skip("this is broken")
17 class ComponentSpecTestCase(unittest
.TestCase
):
18 def test_create_good_no_params(self
):
19 spec
= bt2
.ComponentSpec('plugin', 'compcls')
21 def test_create_good_with_params(self
):
22 spec
= bt2
.ComponentSpec('plugin', 'compcls', {'salut': 23})
24 def test_create_good_with_path_params(self
):
25 spec
= bt2
.ComponentSpec('plugin', 'compcls', 'a path')
26 self
.assertEqual(spec
.params
['path'], 'a path')
28 def test_create_wrong_plugin_name_type(self
):
29 with self
.assertRaises(TypeError):
30 spec
= bt2
.ComponentSpec(23, 'compcls')
32 def test_create_wrong_component_class_name_type(self
):
33 with self
.assertRaises(TypeError):
34 spec
= bt2
.ComponentSpec('plugin', 190)
36 def test_create_wrong_params_type(self
):
37 with self
.assertRaises(TypeError):
38 spec
= bt2
.ComponentSpec('dwdw', 'compcls', datetime
.datetime
.now())
41 @unittest.skip("this is broken")
42 class TraceCollectionNotificationIteratorTestCase(unittest
.TestCase
):
43 def test_create_wrong_stream_intersection_mode_type(self
):
44 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
46 with self
.assertRaises(TypeError):
47 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=23)
49 def test_create_wrong_begin_type(self
):
50 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
52 with self
.assertRaises(TypeError):
53 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
='hi')
55 def test_create_wrong_end_type(self
):
56 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
58 with self
.assertRaises(TypeError):
59 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
='lel')
61 def test_create_no_such_plugin(self
):
62 specs
= [bt2
.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH
)]
64 with self
.assertRaises(bt2
.Error
):
65 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
67 def test_create_begin_s(self
):
68 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
69 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
=19457.918232)
71 def test_create_end_s(self
):
72 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
73 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, end
=123.12312)
75 def test_create_begin_datetime(self
):
76 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
77 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, begin
=datetime
.datetime
.now())
79 def test_create_end_datetime(self
):
80 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
81 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, end
=datetime
.datetime
.now())
83 def test_iter_no_intersection(self
):
84 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
85 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
86 self
.assertEqual(len(list(notif_iter
)), 28)
88 def test_iter_no_intersection_subscribe(self
):
89 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
90 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
91 notification_types
=[bt2
.EventNotification
])
92 self
.assertEqual(len(list(notif_iter
)), 8)
94 def test_iter_specs_not_list(self
):
95 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
96 notif_iter
= bt2
.TraceCollectionNotificationIterator(spec
,
97 notification_types
=[bt2
.EventNotification
])
98 self
.assertEqual(len(list(notif_iter
)), 8)
100 def test_iter_custom_filter(self
):
101 src_spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
102 flt_spec
= bt2
.ComponentSpec('utils', 'trimmer', {
103 'end': 13515309000000075,
105 notif_iter
= bt2
.TraceCollectionNotificationIterator(src_spec
, flt_spec
,
106 notification_types
=[bt2
.EventNotification
])
107 self
.assertEqual(len(list(notif_iter
)), 5)
109 def test_iter_intersection(self
):
110 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
111 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True)
112 self
.assertEqual(len(list(notif_iter
)), 15)
114 def test_iter_intersection_subscribe(self
):
115 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
116 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True,
117 notification_types
=[bt2
.EventNotification
])
118 self
.assertEqual(len(list(notif_iter
)), 3)
120 def test_iter_intersection_no_path_param(self
):
121 specs
= [bt2
.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
123 with self
.assertRaises(bt2
.Error
):
124 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
, stream_intersection_mode
=True,
125 notification_types
=[bt2
.EventNotification
])
127 def test_iter_no_intersection_two_traces(self
):
128 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
130 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
)
131 self
.assertEqual(len(list(notif_iter
)), 56)
133 def test_iter_no_intersection_begin(self
):
134 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
135 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
136 notification_types
=[bt2
.EventNotification
],
137 begin
=13515309.000000023)
138 self
.assertEqual(len(list(notif_iter
)), 6)
140 def test_iter_no_intersection_end(self
):
141 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
142 notif_iter
= bt2
.TraceCollectionNotificationIterator(specs
,
143 notification_types
=[bt2
.EventNotification
],
144 end
=13515309.000000075)
145 self
.assertEqual(len(list(notif_iter
)), 5)