1 # The MIT License (MIT)
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 from bt2
import native_bt
, object, utils
24 import bt2
.clock_class_priority_map
25 import bt2
.clock_value
33 def _create_from_ptr(ptr
):
34 notif_type
= native_bt
.notification_get_type(ptr
)
37 if notif_type
not in _NOTIF_TYPE_TO_CLS
:
38 raise bt2
.Error('unknown notification type: {}'.format(notif_type
))
40 return _NOTIF_TYPE_TO_CLS
[notif_type
]._create
_from
_ptr
(ptr
)
43 def _notif_types_from_notif_classes(notification_types
):
44 if notification_types
is None:
47 for notif_cls
in notification_types
:
48 if notif_cls
not in _NOTIF_TYPE_TO_CLS
.values():
49 raise ValueError("'{}' is not a notification class".format(notif_cls
))
51 notif_types
= [notif_cls
._TYPE
for notif_cls
in notification_types
]
56 class _Notification(object._Object
):
60 class _CopyableNotification(_Notification
):
62 return self
._copy
(lambda obj
: obj
)
64 def __deepcopy__(self
, memo
):
65 cpy
= self
._copy
(copy
.deepcopy
)
70 class EventNotification(_CopyableNotification
):
71 _TYPE
= native_bt
.NOTIFICATION_TYPE_EVENT
73 def __init__(self
, event
, cc_prio_map
=None):
74 utils
._check
_type
(event
, bt2
.event
._Event
)
76 if cc_prio_map
is not None:
77 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
78 cc_prio_map_ptr
= cc_prio_map
._ptr
80 cc_prio_map_ptr
= None
82 ptr
= native_bt
.notification_event_create(event
._ptr
, cc_prio_map_ptr
)
85 raise bt2
.CreationError('cannot create event notification object')
91 event_ptr
= native_bt
.notification_event_get_event(self
._ptr
)
93 return bt2
.event
._create
_from
_ptr
(event_ptr
)
96 def clock_class_priority_map(self
):
97 cc_prio_map_ptr
= native_bt
.notification_event_get_clock_class_priority_map(self
._ptr
)
98 assert(cc_prio_map_ptr
)
99 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
101 def __eq__(self
, other
):
102 if type(other
) is not type(self
):
105 if self
.addr
== other
.addr
:
110 self
.clock_class_priority_map
,
114 other
.clock_class_priority_map
,
116 return self_props
== other_props
118 def _copy(self
, copy_func
):
119 # We can always use references here because those properties are
120 # frozen anyway if they are part of a notification. Since the
121 # user cannot modify them after copying the notification, it's
122 # useless to copy/deep-copy them.
123 return EventNotification(self
.event
, self
.clock_class_priority_map
)
126 class PacketBeginningNotification(_CopyableNotification
):
127 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
129 def __init__(self
, packet
):
130 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
131 ptr
= native_bt
.notification_packet_begin_create(packet
._ptr
)
134 raise bt2
.CreationError('cannot create packet beginning notification object')
136 super().__init
__(ptr
)
140 packet_ptr
= native_bt
.notification_packet_begin_get_packet(self
._ptr
)
142 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
144 def __eq__(self
, other
):
145 if type(other
) is not type(self
):
148 if self
.addr
== other
.addr
:
151 return self
.packet
== other
.packet
153 def _copy(self
, copy_func
):
154 # We can always use references here because those properties are
155 # frozen anyway if they are part of a notification. Since the
156 # user cannot modify them after copying the notification, it's
157 # useless to copy/deep-copy them.
158 return PacketBeginningNotification(self
.packet
)
161 class PacketEndNotification(_CopyableNotification
):
162 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_END
164 def __init__(self
, packet
):
165 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
166 ptr
= native_bt
.notification_packet_end_create(packet
._ptr
)
169 raise bt2
.CreationError('cannot create packet end notification object')
171 super().__init
__(ptr
)
175 packet_ptr
= native_bt
.notification_packet_end_get_packet(self
._ptr
)
177 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
179 def __eq__(self
, other
):
180 if type(other
) is not type(self
):
183 if self
.addr
== other
.addr
:
186 return self
.packet
== other
.packet
188 def _copy(self
, copy_func
):
189 # We can always use references here because those properties are
190 # frozen anyway if they are part of a notification. Since the
191 # user cannot modify them after copying the notification, it's
192 # useless to copy/deep-copy them.
193 return PacketEndNotification(self
.packet
)
196 class StreamBeginningNotification(_CopyableNotification
):
197 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
199 def __init__(self
, stream
):
200 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
201 ptr
= native_bt
.notification_stream_begin_create(stream
._ptr
)
204 raise bt2
.CreationError('cannot create stream beginning notification object')
206 super().__init
__(ptr
)
210 stream_ptr
= native_bt
.notification_stream_begin_get_stream(self
._ptr
)
212 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
214 def __eq__(self
, other
):
215 if type(other
) is not type(self
):
218 if self
.addr
== other
.addr
:
221 return self
.stream
== other
.stream
223 def _copy(self
, copy_func
):
224 # We can always use references here because those properties are
225 # frozen anyway if they are part of a notification. Since the
226 # user cannot modify them after copying the notification, it's
227 # useless to copy/deep-copy them.
228 return StreamBeginningNotification(self
.stream
)
231 class StreamEndNotification(_CopyableNotification
):
232 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_END
234 def __init__(self
, stream
):
235 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
236 ptr
= native_bt
.notification_stream_end_create(stream
._ptr
)
239 raise bt2
.CreationError('cannot create stream end notification object')
241 super().__init
__(ptr
)
245 stream_ptr
= native_bt
.notification_stream_end_get_stream(self
._ptr
)
247 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
249 def __eq__(self
, other
):
250 if type(other
) is not type(self
):
253 if self
.addr
== other
.addr
:
256 return self
.stream
== other
.stream
258 def _copy(self
, copy_func
):
259 # We can always use references here because those properties are
260 # frozen anyway if they are part of a notification. Since the
261 # user cannot modify them after copying the notification, it's
262 # useless to copy/deep-copy them.
263 return StreamEndNotification(self
.stream
)
266 class InactivityNotification(_CopyableNotification
):
267 _TYPE
= native_bt
.NOTIFICATION_TYPE_INACTIVITY
269 def __init__(self
, cc_prio_map
=None):
270 if cc_prio_map
is not None:
271 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
272 cc_prio_map_ptr
= cc_prio_map
._ptr
274 cc_prio_map_ptr
= None
276 ptr
= native_bt
.notification_inactivity_create(cc_prio_map_ptr
)
279 raise bt2
.CreationError('cannot create inactivity notification object')
281 super().__init
__(ptr
)
284 def clock_class_priority_map(self
):
285 cc_prio_map_ptr
= native_bt
.notification_inactivity_get_clock_class_priority_map(self
._ptr
)
286 assert(cc_prio_map_ptr
)
287 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
289 def clock_value(self
, clock_class
):
290 utils
._check
_type
(clock_class
, bt2
.ClockClass
)
291 clock_value_ptr
= native_bt
.notification_inactivity_get_clock_value(self
._ptr
,
294 if clock_value_ptr
is None:
297 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
300 def add_clock_value(self
, clock_value
):
301 utils
._check
_type
(clock_value
, bt2
.clock_value
._ClockValue
)
302 ret
= native_bt
.notification_inactivity_set_clock_value(self
._ptr
,
304 utils
._handle
_ret
(ret
, "cannot set inactivity notification object's clock value")
306 def _get_clock_values(self
):
309 for clock_class
in self
.clock_class_priority_map
:
310 clock_value
= self
.clock_value(clock_class
)
312 if clock_value
is None:
315 clock_values
[clock_class
] = clock_value
319 def __eq__(self
, other
):
320 if type(other
) is not type(self
):
323 if self
.addr
== other
.addr
:
327 self
.clock_class_priority_map
,
328 self
._get
_clock
_values
(),
331 other
.clock_class_priority_map
,
332 other
._get
_clock
_values
(),
334 return self_props
== other_props
337 cpy
= InactivityNotification(self
.clock_class_priority_map
)
339 for clock_class
in self
.clock_class_priority_map
:
340 clock_value
= self
.clock_value(clock_class
)
342 if clock_value
is None:
345 cpy
.add_clock_value(clock_value
)
349 def __deepcopy__(self
, memo
):
350 cc_prio_map_cpy
= copy
.deepcopy(self
.clock_class_priority_map
)
351 cpy
= InactivityNotification(cc_prio_map_cpy
)
354 for orig_clock_class
in self
.clock_class_priority_map
:
355 orig_clock_value
= self
.clock_value(orig_clock_class
)
357 if orig_clock_value
is None:
360 # find equivalent, copied clock class in CC priority map copy
361 for cpy_clock_class
in cc_prio_map_cpy
:
362 if cpy_clock_class
== orig_clock_class
:
365 # create copy of clock value from copied clock class
366 clock_value_cpy
= cpy_clock_class(orig_clock_value
.cycles
)
368 # set copied clock value in notification copy
369 cpy
.add_clock_value(clock_value_cpy
)
375 class _DiscardedElementsNotification(_Notification
):
376 def __eq__(self
, other
):
377 if type(other
) is not type(self
):
380 if self
.addr
== other
.addr
:
386 self
.beginning_clock_value
,
387 self
.end_clock_value
,
392 other
.beginning_clock_value
,
393 other
.end_clock_value
,
395 return self_props
== other_props
398 class _DiscardedPacketsNotification(_DiscardedElementsNotification
):
399 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
403 count
= native_bt
.notification_discarded_packets_get_count(self
._ptr
)
409 stream_ptr
= native_bt
.notification_discarded_packets_get_stream(self
._ptr
)
411 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
414 def beginning_clock_value(self
):
415 clock_value_ptr
= native_bt
.notification_discarded_packets_get_begin_clock_value(self
._ptr
)
417 if clock_value_ptr
is None:
420 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
424 def end_clock_value(self
):
425 clock_value_ptr
= native_bt
.notification_discarded_packets_get_end_clock_value(self
._ptr
)
427 if clock_value_ptr
is None:
430 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
434 class _DiscardedEventsNotification(_DiscardedElementsNotification
):
435 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
439 count
= native_bt
.notification_discarded_events_get_count(self
._ptr
)
445 stream_ptr
= native_bt
.notification_discarded_events_get_stream(self
._ptr
)
447 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
450 def beginning_clock_value(self
):
451 clock_value_ptr
= native_bt
.notification_discarded_events_get_begin_clock_value(self
._ptr
)
453 if clock_value_ptr
is None:
456 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
460 def end_clock_value(self
):
461 clock_value_ptr
= native_bt
.notification_discarded_events_get_end_clock_value(self
._ptr
)
463 if clock_value_ptr
is None:
466 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
470 _NOTIF_TYPE_TO_CLS
= {
471 native_bt
.NOTIFICATION_TYPE_EVENT
: EventNotification
,
472 native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
: PacketBeginningNotification
,
473 native_bt
.NOTIFICATION_TYPE_PACKET_END
: PacketEndNotification
,
474 native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
: StreamBeginningNotification
,
475 native_bt
.NOTIFICATION_TYPE_STREAM_END
: StreamEndNotification
,
476 native_bt
.NOTIFICATION_TYPE_INACTIVITY
: InactivityNotification
,
477 native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
: _DiscardedPacketsNotification
,
478 native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
: _DiscardedEventsNotification
,