1 # The MIT License (MIT)
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 from bt2
import native_bt
, object, utils
24 import bt2
.clock_class_priority_map
32 def _create_from_ptr(ptr
):
33 notif_type
= native_bt
.notification_get_type(ptr
)
36 if notif_type
not in _NOTIF_TYPE_TO_CLS
:
37 raise bt2
.Error('unknown notification type: {}'.format(notif_type
))
39 return _NOTIF_TYPE_TO_CLS
[notif_type
]._create
_from
_ptr
(ptr
)
42 class _Notification(object._Object
):
46 class _CopyableNotification(_Notification
):
48 return self
._copy
(lambda obj
: obj
)
50 def __deepcopy__(self
, memo
):
51 cpy
= self
._copy
(copy
.deepcopy
)
56 class EventNotification(_CopyableNotification
):
57 _TYPE
= native_bt
.NOTIFICATION_TYPE_EVENT
59 def __init__(self
, event
, cc_prio_map
=None):
60 utils
._check
_type
(event
, bt2
.event
._Event
)
62 if cc_prio_map
is not None:
63 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
64 cc_prio_map_ptr
= cc_prio_map
._ptr
66 cc_prio_map_ptr
= None
68 ptr
= native_bt
.notification_event_create(event
._ptr
, cc_prio_map_ptr
)
71 raise bt2
.CreationError('cannot create event notification object')
77 event_ptr
= native_bt
.notification_event_get_event(self
._ptr
)
79 return bt2
.event
._create
_from
_ptr
(event_ptr
)
82 def clock_class_priority_map(self
):
83 cc_prio_map_ptr
= native_bt
.notification_event_get_clock_class_priority_map(self
._ptr
)
84 assert(cc_prio_map_ptr
)
85 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
87 def __eq__(self
, other
):
88 if type(other
) is not type(self
):
91 if self
.addr
== other
.addr
:
96 self
.clock_class_priority_map
,
100 other
.clock_class_priority_map
,
102 return self_props
== other_props
104 def _copy(self
, copy_func
):
105 # We can always use references here because those properties are
106 # frozen anyway if they are part of a notification. Since the
107 # user cannot modify them after copying the notification, it's
108 # useless to copy/deep-copy them.
109 return EventNotification(self
.event
, self
.clock_class_priority_map
)
112 class PacketBeginningNotification(_CopyableNotification
):
113 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
115 def __init__(self
, packet
):
116 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
117 ptr
= native_bt
.notification_packet_begin_create(packet
._ptr
)
120 raise bt2
.CreationError('cannot create packet beginning notification object')
122 super().__init
__(ptr
)
126 packet_ptr
= native_bt
.notification_packet_begin_get_packet(self
._ptr
)
128 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
130 def __eq__(self
, other
):
131 if type(other
) is not type(self
):
134 if self
.addr
== other
.addr
:
137 return self
.packet
== other
.packet
139 def _copy(self
, copy_func
):
140 # We can always use references here because those properties are
141 # frozen anyway if they are part of a notification. Since the
142 # user cannot modify them after copying the notification, it's
143 # useless to copy/deep-copy them.
144 return PacketBeginningNotification(self
.packet
)
147 class PacketEndNotification(_CopyableNotification
):
148 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_END
150 def __init__(self
, packet
):
151 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
152 ptr
= native_bt
.notification_packet_end_create(packet
._ptr
)
155 raise bt2
.CreationError('cannot create packet end notification object')
157 super().__init
__(ptr
)
161 packet_ptr
= native_bt
.notification_packet_end_get_packet(self
._ptr
)
163 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
165 def __eq__(self
, other
):
166 if type(other
) is not type(self
):
169 if self
.addr
== other
.addr
:
172 return self
.packet
== other
.packet
174 def _copy(self
, copy_func
):
175 # We can always use references here because those properties are
176 # frozen anyway if they are part of a notification. Since the
177 # user cannot modify them after copying the notification, it's
178 # useless to copy/deep-copy them.
179 return PacketEndNotification(self
.packet
)
182 class StreamBeginningNotification(_CopyableNotification
):
183 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
185 def __init__(self
, stream
):
186 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
187 ptr
= native_bt
.notification_stream_begin_create(stream
._ptr
)
190 raise bt2
.CreationError('cannot create stream beginning notification object')
192 super().__init
__(ptr
)
196 stream_ptr
= native_bt
.notification_stream_begin_get_stream(self
._ptr
)
198 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
200 def __eq__(self
, other
):
201 if type(other
) is not type(self
):
204 if self
.addr
== other
.addr
:
207 return self
.stream
== other
.stream
209 def _copy(self
, copy_func
):
210 # We can always use references here because those properties are
211 # frozen anyway if they are part of a notification. Since the
212 # user cannot modify them after copying the notification, it's
213 # useless to copy/deep-copy them.
214 return StreamBeginningNotification(self
.stream
)
217 class StreamEndNotification(_CopyableNotification
):
218 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_END
220 def __init__(self
, stream
):
221 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
222 ptr
= native_bt
.notification_stream_end_create(stream
._ptr
)
225 raise bt2
.CreationError('cannot create stream end notification object')
227 super().__init
__(ptr
)
231 stream_ptr
= native_bt
.notification_stream_end_get_stream(self
._ptr
)
233 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
235 def __eq__(self
, other
):
236 if type(other
) is not type(self
):
239 if self
.addr
== other
.addr
:
242 return self
.stream
== other
.stream
244 def _copy(self
, copy_func
):
245 # We can always use references here because those properties are
246 # frozen anyway if they are part of a notification. Since the
247 # user cannot modify them after copying the notification, it's
248 # useless to copy/deep-copy them.
249 return StreamEndNotification(self
.stream
)
252 class InactivityNotification(_CopyableNotification
):
253 _TYPE
= native_bt
.NOTIFICATION_TYPE_INACTIVITY
255 def __init__(self
, cc_prio_map
=None):
256 if cc_prio_map
is not None:
257 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
258 cc_prio_map_ptr
= cc_prio_map
._ptr
260 cc_prio_map_ptr
= None
262 ptr
= native_bt
.notification_inactivity_create(cc_prio_map_ptr
)
265 raise bt2
.CreationError('cannot create inactivity notification object')
267 super().__init
__(ptr
)
270 def clock_class_priority_map(self
):
271 cc_prio_map_ptr
= native_bt
.notification_inactivity_get_clock_class_priority_map(self
._ptr
)
272 assert(cc_prio_map_ptr
)
273 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
275 def clock_value(self
, clock_class
):
276 utils
._check
_type
(clock_class
, bt2
.ClockClass
)
277 clock_value_ptr
= native_bt
.notification_inactivity_get_clock_value(self
._ptr
,
280 if clock_value_ptr
is None:
283 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
286 def add_clock_value(self
, clock_value
):
287 utils
._check
_type
(clock_value
, bt2
.clock_class
._ClockValue
)
288 ret
= native_bt
.notification_inactivity_set_clock_value(self
._ptr
,
290 utils
._handle
_ret
(ret
, "cannot set inactivity notification object's clock value")
292 def _get_clock_values(self
):
295 for clock_class
in self
.clock_class_priority_map
:
296 clock_value
= self
.clock_value(clock_class
)
298 if clock_value
is None:
301 clock_values
[clock_class
] = clock_value
305 def __eq__(self
, other
):
306 if type(other
) is not type(self
):
309 if self
.addr
== other
.addr
:
313 self
.clock_class_priority_map
,
314 self
._get
_clock
_values
(),
317 other
.clock_class_priority_map
,
318 other
._get
_clock
_values
(),
320 return self_props
== other_props
323 cpy
= InactivityNotification(self
.clock_class_priority_map
)
325 for clock_class
in self
.clock_class_priority_map
:
326 clock_value
= self
.clock_value(clock_class
)
328 if clock_value
is None:
331 cpy
.add_clock_value(clock_value
)
335 def __deepcopy__(self
, memo
):
336 cc_prio_map_cpy
= copy
.deepcopy(self
.clock_class_priority_map
)
337 cpy
= InactivityNotification(cc_prio_map_cpy
)
340 for orig_clock_class
in self
.clock_class_priority_map
:
341 orig_clock_value
= self
.clock_value(orig_clock_class
)
343 if orig_clock_value
is None:
346 # find equivalent, copied clock class in CC priority map copy
347 for cpy_clock_class
in cc_prio_map_cpy
:
348 if cpy_clock_class
== orig_clock_class
:
351 # create copy of clock value from copied clock class
352 clock_value_cpy
= cpy_clock_class(orig_clock_value
.cycles
)
354 # set copied clock value in notification copy
355 cpy
.add_clock_value(clock_value_cpy
)
361 class _DiscardedElementsNotification(_Notification
):
362 def __eq__(self
, other
):
363 if type(other
) is not type(self
):
366 if self
.addr
== other
.addr
:
372 self
.beginning_clock_value
,
373 self
.end_clock_value
,
378 other
.beginning_clock_value
,
379 other
.end_clock_value
,
381 return self_props
== other_props
384 class _DiscardedPacketsNotification(_DiscardedElementsNotification
):
385 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
389 count
= native_bt
.notification_discarded_packets_get_count(self
._ptr
)
395 stream_ptr
= native_bt
.notification_discarded_packets_get_stream(self
._ptr
)
397 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
400 def beginning_clock_value(self
):
401 clock_value_ptr
= native_bt
.notification_discarded_packets_get_begin_clock_value(self
._ptr
)
403 if clock_value_ptr
is None:
406 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
410 def end_clock_value(self
):
411 clock_value_ptr
= native_bt
.notification_discarded_packets_get_end_clock_value(self
._ptr
)
413 if clock_value_ptr
is None:
416 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
420 class _DiscardedEventsNotification(_DiscardedElementsNotification
):
421 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
425 count
= native_bt
.notification_discarded_events_get_count(self
._ptr
)
431 stream_ptr
= native_bt
.notification_discarded_events_get_stream(self
._ptr
)
433 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
436 def beginning_clock_value(self
):
437 clock_value_ptr
= native_bt
.notification_discarded_events_get_begin_clock_value(self
._ptr
)
439 if clock_value_ptr
is None:
442 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
446 def end_clock_value(self
):
447 clock_value_ptr
= native_bt
.notification_discarded_events_get_end_clock_value(self
._ptr
)
449 if clock_value_ptr
is None:
452 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
456 _NOTIF_TYPE_TO_CLS
= {
457 native_bt
.NOTIFICATION_TYPE_EVENT
: EventNotification
,
458 native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
: PacketBeginningNotification
,
459 native_bt
.NOTIFICATION_TYPE_PACKET_END
: PacketEndNotification
,
460 native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
: StreamBeginningNotification
,
461 native_bt
.NOTIFICATION_TYPE_STREAM_END
: StreamEndNotification
,
462 native_bt
.NOTIFICATION_TYPE_INACTIVITY
: InactivityNotification
,
463 native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
: _DiscardedPacketsNotification
,
464 native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
: _DiscardedEventsNotification
,