self.assertTrue(src_iter_initialized)
self.assertTrue(flt_iter_initialized)
+ # Test that creating a message iterator from a sink component on a
+ # non-connected inport port raises.
+ def test_create_from_sink_component_unconnected_port_raises(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(comp_self, config, params, obj):
+ comp_self._input_port = comp_self._add_input_port('in')
+
+ def _user_graph_is_configured(comp_self):
+ with self.assertRaisesRegex(ValueError, 'input port is not connected'):
+ comp_self._create_message_iterator(comp_self._input_port)
+
+ nonlocal seen
+ seen = True
+
+ def _user_consume(self):
+ raise bt2.Stop
+
+ seen = False
+ graph = bt2.Graph()
+ graph.add_component(MySink, 'snk')
+ graph.run()
+ self.assertTrue(seen)
+
+ # Test that creating a message iterator from a message iteartor on a
+ # non-connected inport port raises.
+ def test_create_from_message_iterator_unconnected_port_raises(self):
+ class MyFilterIter(bt2._UserMessageIterator):
+ def __init__(iter_self, config, port):
+ input_port = iter_self._component._input_ports['in']
+
+ with self.assertRaisesRegex(ValueError, 'input port is not connected'):
+ iter_self._create_message_iterator(input_port)
+
+ nonlocal seen
+ seen = True
+
+ class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
+ def __init__(comp_self, config, params, obj):
+ comp_self._add_input_port('in')
+ comp_self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(comp_self, config, params, obj):
+ comp_self._input_port = comp_self._add_input_port('in')
+
+ def _user_graph_is_configured(comp_self):
+ comp_self._input_iter = comp_self._create_message_iterator(
+ comp_self._input_port
+ )
+
+ def _user_consume(self):
+ raise bt2.Stop
+
+ seen = False
+ graph = bt2.Graph()
+ flt = graph.add_component(MyFilter, 'flt')
+ snk = graph.add_component(MySink, 'snk')
+ graph.connect_ports(flt.output_ports['out'], snk.input_ports['in'])
+ graph.run()
+ self.assertTrue(seen)
+
def test_create_user_error(self):
# This tests both error handling by
# _UserSinkComponent._create_message_iterator