7 @unittest.skip("this is broken")
8 class ConnectionTestCase(unittest
.TestCase
):
10 class MyIter(bt2
._UserNotificationIterator
):
14 class MySource(bt2
._UserSourceComponent
,
15 notification_iterator_class
=MyIter
):
16 def __init__(self
, params
):
17 self
._add
_output
_port
('out')
19 class MySink(bt2
._UserSinkComponent
):
20 def __init__(self
, params
):
21 self
._add
_input
_port
('in')
27 src
= graph
.add_component(MySource
, 'src')
28 sink
= graph
.add_component(MySink
, 'sink')
29 conn
= graph
.connect_ports(src
.output_ports
['out'],
30 sink
.input_ports
['in'])
31 self
.assertIsInstance(conn
, bt2
._Connection
)
32 self
.assertNotIsInstance(conn
, bt2
._PrivateConnection
)
34 def test_is_ended_false(self
):
35 class MyIter(bt2
._UserNotificationIterator
):
39 class MySource(bt2
._UserSourceComponent
,
40 notification_iterator_class
=MyIter
):
41 def __init__(self
, params
):
42 self
._add
_output
_port
('out')
44 class MySink(bt2
._UserSinkComponent
):
45 def __init__(self
, params
):
46 self
._add
_input
_port
('in')
52 src
= graph
.add_component(MySource
, 'src')
53 sink
= graph
.add_component(MySink
, 'sink')
54 conn
= graph
.connect_ports(src
.output_ports
['out'],
55 sink
.input_ports
['in'])
56 self
.assertFalse(conn
.is_ended
)
58 def test_is_ended_true(self
):
59 class MyIter(bt2
._UserNotificationIterator
):
63 class MySource(bt2
._UserSourceComponent
,
64 notification_iterator_class
=MyIter
):
65 def __init__(self
, params
):
66 self
._add
_output
_port
('out')
68 class MySink(bt2
._UserSinkComponent
):
69 def __init__(self
, params
):
70 self
._add
_input
_port
('in')
76 src
= graph
.add_component(MySource
, 'src')
77 sink
= graph
.add_component(MySink
, 'sink')
78 conn
= graph
.connect_ports(src
.output_ports
['out'],
79 sink
.input_ports
['in'])
80 src
.output_ports
['out'].disconnect()
81 self
.assertTrue(conn
.is_ended
)
83 def test_downstream_port(self
):
84 class MyIter(bt2
._UserNotificationIterator
):
88 class MySource(bt2
._UserSourceComponent
,
89 notification_iterator_class
=MyIter
):
90 def __init__(self
, params
):
91 self
._add
_output
_port
('out')
93 class MySink(bt2
._UserSinkComponent
):
94 def __init__(self
, params
):
95 self
._add
_input
_port
('in')
101 src
= graph
.add_component(MySource
, 'src')
102 sink
= graph
.add_component(MySink
, 'sink')
103 conn
= graph
.connect_ports(src
.output_ports
['out'],
104 sink
.input_ports
['in'])
105 self
.assertEqual(conn
.downstream_port
.addr
, sink
.input_ports
['in'].addr
)
107 def test_upstream_port(self
):
108 class MyIter(bt2
._UserNotificationIterator
):
112 class MySource(bt2
._UserSourceComponent
,
113 notification_iterator_class
=MyIter
):
114 def __init__(self
, params
):
115 self
._add
_output
_port
('out')
117 class MySink(bt2
._UserSinkComponent
):
118 def __init__(self
, params
):
119 self
._add
_input
_port
('in')
125 src
= graph
.add_component(MySource
, 'src')
126 sink
= graph
.add_component(MySink
, 'sink')
127 conn
= graph
.connect_ports(src
.output_ports
['out'],
128 sink
.input_ports
['in'])
129 self
.assertEqual(conn
.upstream_port
.addr
, src
.output_ports
['out'].addr
)
132 class MyIter(bt2
._UserNotificationIterator
):
136 class MySource(bt2
._UserSourceComponent
,
137 notification_iterator_class
=MyIter
):
138 def __init__(self
, params
):
139 self
._add
_output
_port
('out')
141 class MySink(bt2
._UserSinkComponent
):
142 def __init__(self
, params
):
143 self
._add
_input
_port
('in')
149 src
= graph
.add_component(MySource
, 'src')
150 sink
= graph
.add_component(MySink
, 'sink')
151 conn
= graph
.connect_ports(src
.output_ports
['out'],
152 sink
.input_ports
['in'])
153 self
.assertEqual(conn
, conn
)
155 def test_eq_invalid(self
):
156 class MyIter(bt2
._UserNotificationIterator
):
160 class MySource(bt2
._UserSourceComponent
,
161 notification_iterator_class
=MyIter
):
162 def __init__(self
, params
):
163 self
._add
_output
_port
('out')
165 class MySink(bt2
._UserSinkComponent
):
166 def __init__(self
, params
):
167 self
._add
_input
_port
('in')
173 src
= graph
.add_component(MySource
, 'src')
174 sink
= graph
.add_component(MySink
, 'sink')
175 conn
= graph
.connect_ports(src
.output_ports
['out'],
176 sink
.input_ports
['in'])
177 self
.assertNotEqual(conn
, 23)
180 @unittest.skip("this is broken")
181 class PrivateConnectionTestCase(unittest
.TestCase
):
182 def test_create(self
):
183 class MyIter(bt2
._UserNotificationIterator
):
187 class MySource(bt2
._UserSourceComponent
,
188 notification_iterator_class
=MyIter
):
189 def __init__(self
, params
):
190 self
._add
_output
_port
('out')
192 class MySink(bt2
._UserSinkComponent
):
193 def __init__(self
, params
):
194 self
._add
_input
_port
('in')
199 def _port_connected(self
, port
, other_port
):
201 priv_conn
= port
.connection
205 src
= graph
.add_component(MySource
, 'src')
206 sink
= graph
.add_component(MySink
, 'sink')
207 conn
= graph
.connect_ports(src
.output_ports
['out'],
208 sink
.input_ports
['in'])
209 self
.assertIsInstance(priv_conn
, bt2
._PrivateConnection
)
210 self
.assertEqual(conn
, priv_conn
)
213 def test_is_ended_false(self
):
214 class MyIter(bt2
._UserNotificationIterator
):
218 class MySource(bt2
._UserSourceComponent
,
219 notification_iterator_class
=MyIter
):
220 def __init__(self
, params
):
221 self
._add
_output
_port
('out')
223 class MySink(bt2
._UserSinkComponent
):
224 def __init__(self
, params
):
225 self
._add
_input
_port
('in')
230 def _port_connected(self
, port
, other_port
):
232 priv_conn
= port
.connection
236 src
= graph
.add_component(MySource
, 'src')
237 sink
= graph
.add_component(MySink
, 'sink')
238 conn
= graph
.connect_ports(src
.output_ports
['out'],
239 sink
.input_ports
['in'])
240 self
.assertFalse(priv_conn
.is_ended
)
243 def test_is_ended_true(self
):
244 class MyIter(bt2
._UserNotificationIterator
):
248 class MySource(bt2
._UserSourceComponent
,
249 notification_iterator_class
=MyIter
):
250 def __init__(self
, params
):
251 self
._add
_output
_port
('out')
253 class MySink(bt2
._UserSinkComponent
):
254 def __init__(self
, params
):
255 self
._add
_input
_port
('in')
260 def _port_connected(self
, port
, other_port
):
262 priv_conn
= port
.connection
266 src
= graph
.add_component(MySource
, 'src')
267 sink
= graph
.add_component(MySink
, 'sink')
268 conn
= graph
.connect_ports(src
.output_ports
['out'],
269 sink
.input_ports
['in'])
270 sink
.input_ports
['in'].disconnect()
271 self
.assertTrue(priv_conn
.is_ended
)
274 def test_downstream_port(self
):
275 class MyIter(bt2
._UserNotificationIterator
):
279 class MySource(bt2
._UserSourceComponent
,
280 notification_iterator_class
=MyIter
):
281 def __init__(self
, params
):
282 self
._add
_output
_port
('out')
284 class MySink(bt2
._UserSinkComponent
):
285 def __init__(self
, params
):
286 self
._add
_input
_port
('in')
291 def _port_connected(self
, port
, other_port
):
293 priv_conn
= port
.connection
294 priv_port
= priv_conn
.downstream_port
298 src
= graph
.add_component(MySource
, 'src')
299 sink
= graph
.add_component(MySink
, 'sink')
300 conn
= graph
.connect_ports(src
.output_ports
['out'],
301 sink
.input_ports
['in'])
302 self
.assertEqual(priv_port
.addr
, sink
.input_ports
['in'].addr
)
305 def test_upstream_port(self
):
306 class MyIter(bt2
._UserNotificationIterator
):
310 class MySource(bt2
._UserSourceComponent
,
311 notification_iterator_class
=MyIter
):
312 def __init__(self
, params
):
313 self
._add
_output
_port
('out')
315 class MySink(bt2
._UserSinkComponent
):
316 def __init__(self
, params
):
317 self
._add
_input
_port
('in')
322 def _port_connected(self
, port
, other_port
):
324 priv_conn
= port
.connection
325 priv_port
= priv_conn
.upstream_port
329 src
= graph
.add_component(MySource
, 'src')
330 sink
= graph
.add_component(MySink
, 'sink')
331 conn
= graph
.connect_ports(src
.output_ports
['out'],
332 sink
.input_ports
['in'])
333 self
.assertEqual(priv_port
.addr
, src
.output_ports
['out'].addr
)
337 class MyIter(bt2
._UserNotificationIterator
):
341 class MySource(bt2
._UserSourceComponent
,
342 notification_iterator_class
=MyIter
):
343 def __init__(self
, params
):
344 self
._add
_output
_port
('out')
346 class MySink(bt2
._UserSinkComponent
):
347 def __init__(self
, params
):
348 self
._add
_input
_port
('in')
353 def _port_connected(self
, port
, other_port
):
355 priv_conn
= port
.connection
359 src
= graph
.add_component(MySource
, 'src')
360 sink
= graph
.add_component(MySink
, 'sink')
361 conn
= graph
.connect_ports(src
.output_ports
['out'],
362 sink
.input_ports
['in'])
363 self
.assertEqual(priv_conn
, conn
)
366 def test_eq_invalid(self
):
367 class MyIter(bt2
._UserNotificationIterator
):
371 class MySource(bt2
._UserSourceComponent
,
372 notification_iterator_class
=MyIter
):
373 def __init__(self
, params
):
374 self
._add
_output
_port
('out')
376 class MySink(bt2
._UserSinkComponent
):
377 def __init__(self
, params
):
378 self
._add
_input
_port
('in')
383 def _port_connected(self
, port
, other_port
):
385 priv_conn
= port
.connection
389 src
= graph
.add_component(MySource
, 'src')
390 sink
= graph
.add_component(MySink
, 'sink')
391 conn
= graph
.connect_ports(src
.output_ports
['out'],
392 sink
.input_ports
['in'])
393 self
.assertNotEqual(priv_conn
, 23)