2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 from utils
import TestOutputPortMessageIterator
23 from bt2
import clock_snapshot
as bt2_clock_snapshot
24 from bt2
import event
as bt2_event
25 from bt2
import event_class
as bt2_event_class
26 from bt2
import field
as bt2_field
27 from bt2
import packet
as bt2_packet
28 from bt2
import stream
as bt2_stream
29 from bt2
import stream_class
as bt2_stream_class
30 from bt2
import trace
as bt2_trace
31 from bt2
import trace_class
as bt2_trace_class
34 class AllMessagesTestCase(unittest
.TestCase
):
36 class MyIter(bt2
._UserMessageIterator
):
37 def __init__(self
, config
, self_port_output
):
39 self
._with
_stream
_msgs
_clock
_snapshots
= self_port_output
.user_data
.get(
40 'with_stream_msgs_clock_snapshots', False
44 if test_obj
._clock
_class
:
46 if self
._with
_stream
_msgs
_clock
_snapshots
:
47 msg
= self
._create
_stream
_beginning
_message
(
48 test_obj
._stream
, default_clock_snapshot
=self
._at
51 msg
= self
._create
_stream
_beginning
_message
(
54 test_obj
.assertIs(type(msg
), bt2
._StreamBeginningMessage
)
56 msg
= self
._create
_packet
_beginning
_message
(
57 test_obj
._packet
, self
._at
59 test_obj
.assertIs(type(msg
), bt2
._PacketBeginningMessage
)
61 msg
= self
._create
_event
_message
(
62 test_obj
._event
_class
, test_obj
._packet
, self
._at
64 test_obj
.assertIs(type(msg
), bt2
._EventMessage
)
66 msg
= self
._create
_message
_iterator
_inactivity
_message
(
67 test_obj
._clock
_class
, self
._at
70 msg
= self
._create
_discarded
_events
_message
(
71 test_obj
._stream
, 890, self
._at
, self
._at
73 test_obj
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
75 msg
= self
._create
_packet
_end
_message
(
76 test_obj
._packet
, self
._at
78 test_obj
.assertIs(type(msg
), bt2
._PacketEndMessage
)
80 msg
= self
._create
_discarded
_packets
_message
(
81 test_obj
._stream
, 678, self
._at
, self
._at
83 test_obj
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
85 if self
._with
_stream
_msgs
_clock
_snapshots
:
86 msg
= self
._create
_stream
_end
_message
(
87 test_obj
._stream
, default_clock_snapshot
=self
._at
90 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
91 test_obj
.assertIs(type(msg
), bt2
._StreamEndMessage
)
96 msg
= self
._create
_stream
_beginning
_message
(test_obj
._stream
)
98 msg
= self
._create
_packet
_beginning
_message
(test_obj
._packet
)
100 msg
= self
._create
_event
_message
(
101 test_obj
._event
_class
, test_obj
._packet
104 msg
= self
._create
_discarded
_events
_message
(
105 test_obj
._stream
, 890
108 msg
= self
._create
_packet
_end
_message
(test_obj
._packet
)
110 msg
= self
._create
_discarded
_packets
_message
(
111 test_obj
._stream
, 678
114 msg
= self
._create
_stream
_end
_message
(test_obj
._stream
)
121 class MySrc(bt2
._UserSourceComponent
, message_iterator_class
=MyIter
):
122 def __init__(self
, config
, params
, obj
):
123 self
._add
_output
_port
('out', params
)
125 with_cc
= bool(params
['with_cc'])
126 tc
= self
._create
_trace
_class
()
128 cc
= self
._create
_clock
_class
()
132 sc
= tc
.create_stream_class(
133 default_clock_class
=cc
,
134 supports_packets
=True,
135 packets_have_beginning_default_clock_snapshot
=with_cc
,
136 packets_have_end_default_clock_snapshot
=with_cc
,
137 supports_discarded_events
=True,
138 discarded_events_have_default_clock_snapshots
=with_cc
,
139 supports_discarded_packets
=True,
140 discarded_packets_have_default_clock_snapshots
=with_cc
,
143 # Create payload field class
144 my_int_fc
= tc
.create_signed_integer_field_class(32)
145 payload_fc
= tc
.create_structure_field_class()
146 payload_fc
+= [('my_int', my_int_fc
)]
148 # Create specific context field class
149 my_int_fc
= tc
.create_signed_integer_field_class(32)
150 specific_fc
= tc
.create_structure_field_class()
151 specific_fc
+= [('my_int', my_int_fc
)]
153 ec
= sc
.create_event_class(
155 payload_field_class
=payload_fc
,
156 specific_context_field_class
=specific_fc
,
160 stream
= trace
.create_stream(sc
)
161 packet
= stream
.create_packet()
163 test_obj
._trace
= trace
164 test_obj
._stream
= stream
165 test_obj
._packet
= packet
166 test_obj
._event
_class
= ec
167 test_obj
._clock
_class
= cc
170 self
._graph
= bt2
.Graph()
174 def test_all_msg_with_cc(self
):
175 params
= {'with_cc': True}
176 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
177 self
._msg
_iter
= TestOutputPortMessageIterator(
178 self
._graph
, self
._src
_comp
.output_ports
['out']
181 for i
, msg
in enumerate(self
._msg
_iter
):
183 self
.assertIs(type(msg
), bt2
._StreamBeginningMessageConst
)
184 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
185 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
186 self
.assertIsInstance(
187 msg
.default_clock_snapshot
, bt2
._UnknownClockSnapshot
190 self
.assertIs(type(msg
), bt2
._PacketBeginningMessageConst
)
191 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
193 type(msg
.default_clock_snapshot
),
194 bt2_clock_snapshot
._ClockSnapshotConst
,
196 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
197 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
199 self
.assertIs(type(msg
), bt2
._EventMessageConst
)
200 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
202 type(msg
.default_clock_snapshot
),
203 bt2_clock_snapshot
._ClockSnapshotConst
,
206 type(msg
.event
.payload_field
), bt2_field
._StructureFieldConst
209 type(msg
.event
.payload_field
['my_int']),
210 bt2_field
._SignedIntegerFieldConst
,
213 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
214 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
216 self
.assertIs(type(msg
), bt2
._MessageIteratorInactivityMessageConst
)
218 type(msg
.clock_snapshot
), bt2_clock_snapshot
._ClockSnapshotConst
220 self
.assertEqual(msg
.clock_snapshot
.value
, i
)
222 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessageConst
)
223 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
224 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
226 type(msg
.beginning_default_clock_snapshot
),
227 bt2_clock_snapshot
._ClockSnapshotConst
,
230 type(msg
.end_default_clock_snapshot
),
231 bt2_clock_snapshot
._ClockSnapshotConst
,
234 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
235 self
.assertEqual(msg
.count
, 890)
237 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
239 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
240 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
242 self
.assertIs(type(msg
), bt2
._PacketEndMessageConst
)
243 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
245 type(msg
.default_clock_snapshot
),
246 bt2_clock_snapshot
._ClockSnapshotConst
,
248 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
249 self
.assertEqual(msg
.default_clock_snapshot
.value
, i
)
251 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessageConst
)
252 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
253 self
.assertIs(type(msg
.stream
.trace
), bt2_trace
._TraceConst
)
255 type(msg
.stream
.trace
.cls
), bt2_trace_class
._TraceClassConst
258 type(msg
.beginning_default_clock_snapshot
),
259 bt2_clock_snapshot
._ClockSnapshotConst
,
262 type(msg
.end_default_clock_snapshot
),
263 bt2_clock_snapshot
._ClockSnapshotConst
,
265 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
266 self
.assertEqual(msg
.count
, 678)
268 msg
.stream
.cls
.default_clock_class
.addr
, self
._clock
_class
.addr
270 self
.assertEqual(msg
.beginning_default_clock_snapshot
.value
, i
)
271 self
.assertEqual(msg
.end_default_clock_snapshot
.value
, i
)
273 self
.assertIs(type(msg
), bt2
._StreamEndMessageConst
)
274 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
275 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
277 type(msg
.default_clock_snapshot
), bt2
._UnknownClockSnapshot
282 def test_all_msg_without_cc(self
):
283 params
= {'with_cc': False}
284 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
285 self
._msg
_iter
= TestOutputPortMessageIterator(
286 self
._graph
, self
._src
_comp
.output_ports
['out']
289 for i
, msg
in enumerate(self
._msg
_iter
):
291 self
.assertIsInstance(msg
, bt2
._StreamBeginningMessageConst
)
292 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
293 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
294 with self
.assertRaisesRegex(
295 ValueError, 'stream class has no default clock class'
297 msg
.default_clock_snapshot
299 self
.assertIsInstance(msg
, bt2
._PacketBeginningMessageConst
)
300 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
301 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
303 self
.assertIsInstance(msg
, bt2
._EventMessageConst
)
304 self
.assertIs(type(msg
.event
), bt2_event
._EventConst
)
305 self
.assertIs(type(msg
.event
.cls
), bt2_event_class
._EventClassConst
)
306 self
.assertEqual(msg
.event
.cls
.addr
, self
._event
_class
.addr
)
307 with self
.assertRaisesRegex(
308 ValueError, 'stream class has no default clock class'
310 msg
.default_clock_snapshot
312 self
.assertIsInstance(msg
, bt2
._DiscardedEventsMessageConst
)
313 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
314 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
315 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
316 self
.assertEqual(msg
.count
, 890)
317 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
318 with self
.assertRaisesRegex(
320 'such a message has no clock snapshots for this stream class',
322 msg
.beginning_default_clock_snapshot
323 with self
.assertRaisesRegex(
325 'such a message has no clock snapshots for this stream class',
327 msg
.end_default_clock_snapshot
329 self
.assertIsInstance(msg
, bt2
._PacketEndMessageConst
)
330 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
331 self
.assertIs(type(msg
.packet
), bt2_packet
._PacketConst
)
333 self
.assertIsInstance(msg
, bt2
._DiscardedPacketsMessageConst
)
334 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
335 self
.assertIs(type(msg
.stream
.cls
), bt2_stream_class
._StreamClassConst
)
337 type(msg
.stream
.cls
.trace_class
), bt2_trace_class
._TraceClassConst
339 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
340 self
.assertEqual(msg
.count
, 678)
341 self
.assertIsNone(msg
.stream
.cls
.default_clock_class
)
342 with self
.assertRaisesRegex(
344 'such a message has no clock snapshots for this stream class',
346 msg
.beginning_default_clock_snapshot
347 with self
.assertRaisesRegex(
349 'such a message has no clock snapshots for this stream class',
351 msg
.end_default_clock_snapshot
353 self
.assertIsInstance(msg
, bt2
._StreamEndMessageConst
)
354 self
.assertIs(type(msg
.stream
), bt2_stream
._StreamConst
)
355 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
356 with self
.assertRaisesRegex(
357 ValueError, 'stream class has no default clock class'
359 msg
.default_clock_snapshot
363 def test_msg_stream_with_clock_snapshots(self
):
364 params
= {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
366 self
._src
_comp
= self
._graph
.add_component(self
._src
, 'my_source', params
)
367 self
._msg
_iter
= TestOutputPortMessageIterator(
368 self
._graph
, self
._src
_comp
.output_ports
['out']
370 msgs
= list(self
._msg
_iter
)
372 msg_stream_beg
= msgs
[0]
373 self
.assertIsInstance(msg_stream_beg
, bt2
._StreamBeginningMessageConst
)
375 type(msg_stream_beg
.default_clock_snapshot
),
376 bt2_clock_snapshot
._ClockSnapshotConst
,
378 self
.assertEqual(msg_stream_beg
.default_clock_snapshot
.value
, 0)
380 msg_stream_end
= msgs
[7]
381 self
.assertIsInstance(msg_stream_end
, bt2
._StreamEndMessageConst
)
383 type(msg_stream_end
.default_clock_snapshot
),
384 bt2_clock_snapshot
._ClockSnapshotConst
,
386 self
.assertEqual(msg_stream_end
.default_clock_snapshot
.value
, 7)
388 def test_stream_beg_msg(self
):
389 msg
= utils
.get_stream_beginning_message()
390 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
392 def test_stream_end_msg(self
):
393 msg
= utils
.get_stream_end_message()
394 self
.assertIs(type(msg
.stream
), bt2_stream
._Stream
)
396 def test_packet_beg_msg(self
):
397 msg
= utils
.get_packet_beginning_message()
398 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
400 def test_packet_end_msg(self
):
401 msg
= utils
.get_packet_end_message()
402 self
.assertIs(type(msg
.packet
), bt2_packet
._Packet
)
404 def test_event_msg(self
):
405 msg
= utils
.get_event_message()
406 self
.assertIs(type(msg
.event
), bt2_event
._Event
)
409 class CreateDiscardedEventMessageTestCase(unittest
.TestCase
):
411 def test_create(self
):
412 def create_stream_class(tc
, cc
):
413 return tc
.create_stream_class(supports_discarded_events
=True)
415 def msg_iter_next(msg_iter
, stream
):
416 return msg_iter
._create
_discarded
_events
_message
(stream
)
418 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
419 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
420 # Broken at the moment.
421 # self.assertIs(msg.count, None)
424 def test_create_with_count(self
):
425 def create_stream_class(tc
, cc
):
426 return tc
.create_stream_class(supports_discarded_events
=True)
428 def msg_iter_next(msg_iter
, stream
):
429 return msg_iter
._create
_discarded
_events
_message
(stream
, count
=242)
431 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
432 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
433 # Broken at the moment.
434 # self.assertEqual(msg.count, 242)
436 # With clock snapshots.
437 def test_create_with_clock_snapshots(self
):
438 def create_stream_class(tc
, cc
):
439 return tc
.create_stream_class(
440 default_clock_class
=cc
,
441 supports_discarded_events
=True,
442 discarded_events_have_default_clock_snapshots
=True,
445 def msg_iter_next(msg_iter
, stream
):
446 return msg_iter
._create
_discarded
_events
_message
(
447 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
450 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
451 self
.assertIs(type(msg
), bt2
._DiscardedEventsMessage
)
452 # Broken at the moment.
453 # self.assertEqual(msg.beginning_default_clock_snapshot, 10);
454 # self.assertEqual(msg.end_default_clock_snapshot, 20);
456 # Trying to create when the stream does not support discarded events.
457 def test_create_unsupported_raises(self
):
458 def create_stream_class(tc
, cc
):
459 return tc
.create_stream_class()
461 def msg_iter_next(msg_iter
, stream
):
462 with self
.assertRaisesRegex(
463 ValueError, 'stream class does not support discarded events'
465 msg_iter
._create
_discarded
_events
_message
(stream
)
469 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
470 self
.assertEqual(res
, 123)
472 # Trying to create with clock snapshots when the stream does not support
474 def test_create_unsupported_clock_snapshots_raises(self
):
475 def create_stream_class(tc
, cc
):
476 return tc
.create_stream_class(supports_discarded_events
=True)
478 def msg_iter_next(msg_iter
, stream
):
479 with self
.assertRaisesRegex(
481 'discarded events have no default clock snapshots for this stream class',
483 msg_iter
._create
_discarded
_events
_message
(
484 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
489 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
490 self
.assertEqual(res
, 123)
492 # Trying to create without clock snapshots when the stream requires them.
493 def test_create_missing_clock_snapshots_raises(self
):
494 def create_stream_class(tc
, cc
):
495 return tc
.create_stream_class(
496 default_clock_class
=cc
,
497 supports_discarded_events
=True,
498 discarded_events_have_default_clock_snapshots
=True,
501 def msg_iter_next(msg_iter
, stream
):
502 with self
.assertRaisesRegex(
504 'discarded events have default clock snapshots for this stream class',
506 msg_iter
._create
_discarded
_events
_message
(stream
)
510 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
511 self
.assertEqual(res
, 123)
514 class CreateDiscardedPacketMessageTestCase(unittest
.TestCase
):
516 def test_create(self
):
517 def create_stream_class(tc
, cc
):
518 return tc
.create_stream_class(
519 supports_packets
=True, supports_discarded_packets
=True
522 def msg_iter_next(msg_iter
, stream
):
523 return msg_iter
._create
_discarded
_packets
_message
(stream
)
525 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
526 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
527 self
.assertIs(msg
.count
, None)
530 def test_create_with_count(self
):
531 def create_stream_class(tc
, cc
):
532 return tc
.create_stream_class(
533 supports_packets
=True, supports_discarded_packets
=True
536 def msg_iter_next(msg_iter
, stream
):
537 return msg_iter
._create
_discarded
_packets
_message
(stream
, count
=242)
539 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
540 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
541 self
.assertEqual(msg
.count
, 242)
543 # With clock snapshots.
544 def test_create_with_clock_snapshots(self
):
545 def create_stream_class(tc
, cc
):
546 return tc
.create_stream_class(
547 default_clock_class
=cc
,
548 supports_packets
=True,
549 supports_discarded_packets
=True,
550 discarded_packets_have_default_clock_snapshots
=True,
553 def msg_iter_next(msg_iter
, stream
):
554 return msg_iter
._create
_discarded
_packets
_message
(
555 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
558 msg
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
559 self
.assertIs(type(msg
), bt2
._DiscardedPacketsMessage
)
560 self
.assertEqual(msg
.beginning_default_clock_snapshot
, 10)
561 self
.assertEqual(msg
.end_default_clock_snapshot
, 20)
563 # Trying to create when the stream does not support discarded packets.
564 def test_create_unsupported_raises(self
):
565 def create_stream_class(tc
, cc
):
566 return tc
.create_stream_class(supports_packets
=True,)
568 def msg_iter_next(msg_iter
, stream
):
569 with self
.assertRaisesRegex(
570 ValueError, 'stream class does not support discarded packets'
572 msg_iter
._create
_discarded
_packets
_message
(stream
)
576 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
577 self
.assertEqual(res
, 123)
579 # Trying to create with clock snapshots when the stream does not support
581 def test_create_unsupported_clock_snapshots_raises(self
):
582 def create_stream_class(tc
, cc
):
583 return tc
.create_stream_class(
584 supports_packets
=True, supports_discarded_packets
=True
587 def msg_iter_next(msg_iter
, stream
):
588 with self
.assertRaisesRegex(
590 'discarded packets have no default clock snapshots for this stream class',
592 msg_iter
._create
_discarded
_packets
_message
(
593 stream
, beg_clock_snapshot
=10, end_clock_snapshot
=20
598 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
599 self
.assertEqual(res
, 123)
601 # Trying to create without clock snapshots when the stream requires them.
602 def test_create_missing_clock_snapshots_raises(self
):
603 def create_stream_class(tc
, cc
):
604 return tc
.create_stream_class(
605 default_clock_class
=cc
,
606 supports_packets
=True,
607 supports_discarded_packets
=True,
608 discarded_packets_have_default_clock_snapshots
=True,
611 def msg_iter_next(msg_iter
, stream
):
612 with self
.assertRaisesRegex(
614 'discarded packets have default clock snapshots for this stream class',
616 msg_iter
._create
_discarded
_packets
_message
(stream
)
620 res
= utils
.run_in_message_iterator_next(create_stream_class
, msg_iter_next
)
621 self
.assertEqual(res
, 123)
624 if __name__
== '__main__':