1 # SPDX-License-Identifier: MIT
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 from bt2
import object as bt2_object
6 from bt2
import native_bt
10 from bt2
import connection
as bt2_connection
15 def _create_from_const_ptr_and_get_ref(ptr
, port_type
):
16 cls
= _PORT_TYPE_TO_PYCLS
.get(port_type
, None)
19 raise TypeError("unknown port type: {}".format(port_type
))
21 return cls
._create
_from
_ptr
_and
_get
_ref
(ptr
)
24 def _create_self_from_ptr_and_get_ref(ptr
, port_type
):
25 cls
= _PORT_TYPE_TO_USER_PYCLS
.get(port_type
, None)
28 raise TypeError("unknown port type: {}".format(port_type
))
30 return cls
._create
_from
_ptr
_and
_get
_ref
(ptr
)
33 class _PortConst(bt2_object
._SharedObject
):
35 def _get_ref(cls
, ptr
):
36 ptr
= cls
._as
_port
_ptr
(ptr
)
37 return native_bt
.port_get_ref(ptr
)
40 def _put_ref(cls
, ptr
):
41 ptr
= cls
._as
_port
_ptr
(ptr
)
42 return native_bt
.port_put_ref(ptr
)
46 ptr
= self
._as
_port
_ptr
(self
._ptr
)
47 name
= native_bt
.port_get_name(ptr
)
48 assert name
is not None
53 ptr
= self
._as
_port
_ptr
(self
._ptr
)
54 conn_ptr
= native_bt
.port_borrow_connection_const(ptr
)
59 return _bt2_connection()._ConnectionConst
._create
_from
_ptr
_and
_get
_ref
(conn_ptr
)
62 def is_connected(self
):
63 return self
.connection
is not None
66 class _InputPortConst(_PortConst
):
67 _as_port_ptr
= staticmethod(native_bt
.port_input_as_port_const
)
70 class _OutputPortConst(_PortConst
):
71 _as_port_ptr
= staticmethod(native_bt
.port_output_as_port_const
)
74 class _UserComponentPort(_PortConst
):
76 def _as_port_ptr(cls
, ptr
):
77 ptr
= cls
._as
_self
_port
_ptr
(ptr
)
78 return native_bt
.self_component_port_as_port(ptr
)
82 ptr
= self
._as
_port
_ptr
(self
._ptr
)
83 conn_ptr
= native_bt
.port_borrow_connection_const(ptr
)
88 return _bt2_connection()._ConnectionConst
._create
_from
_ptr
_and
_get
_ref
(conn_ptr
)
92 ptr
= self
._as
_self
_port
_ptr
(self
._ptr
)
93 return native_bt
.self_component_port_get_data(ptr
)
96 class _UserComponentInputPort(_UserComponentPort
, _InputPortConst
):
97 _as_self_port_ptr
= staticmethod(
98 native_bt
.self_component_port_input_as_self_component_port
102 class _UserComponentOutputPort(_UserComponentPort
, _OutputPortConst
):
103 _as_self_port_ptr
= staticmethod(
104 native_bt
.self_component_port_output_as_self_component_port
108 _PORT_TYPE_TO_PYCLS
= {
109 native_bt
.PORT_TYPE_INPUT
: _InputPortConst
,
110 native_bt
.PORT_TYPE_OUTPUT
: _OutputPortConst
,
114 _PORT_TYPE_TO_USER_PYCLS
= {
115 native_bt
.PORT_TYPE_INPUT
: _UserComponentInputPort
,
116 native_bt
.PORT_TYPE_OUTPUT
: _UserComponentOutputPort
,