1 # The MIT License (MIT)
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 from bt2
import native_bt
, object, utils
24 import bt2
.clock_class_priority_map
32 def _create_from_ptr(ptr
):
33 notif_type
= native_bt
.notification_get_type(ptr
)
36 if notif_type
not in _NOTIF_TYPE_TO_CLS
:
37 raise bt2
.Error('unknown notification type: {}'.format(notif_type
))
39 return _NOTIF_TYPE_TO_CLS
[notif_type
]._create
_from
_ptr
(ptr
)
42 def _notif_types_from_notif_classes(notification_types
):
43 if notification_types
is None:
46 for notif_cls
in notification_types
:
47 if notif_cls
not in _NOTIF_TYPE_TO_CLS
.values():
48 raise ValueError("'{}' is not a notification class".format(notif_cls
))
50 notif_types
= [notif_cls
._TYPE
for notif_cls
in notification_types
]
55 class _Notification(object._Object
):
59 class _CopyableNotification(_Notification
):
61 return self
._copy
(lambda obj
: obj
)
63 def __deepcopy__(self
, memo
):
64 cpy
= self
._copy
(copy
.deepcopy
)
69 class EventNotification(_CopyableNotification
):
70 _TYPE
= native_bt
.NOTIFICATION_TYPE_EVENT
72 def __init__(self
, event
, cc_prio_map
=None):
73 utils
._check
_type
(event
, bt2
.event
._Event
)
75 if cc_prio_map
is not None:
76 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
77 cc_prio_map_ptr
= cc_prio_map
._ptr
79 cc_prio_map_ptr
= None
81 ptr
= native_bt
.notification_event_create(event
._ptr
, cc_prio_map_ptr
)
84 raise bt2
.CreationError('cannot create event notification object')
90 event_ptr
= native_bt
.notification_event_get_event(self
._ptr
)
92 return bt2
.event
._create
_from
_ptr
(event_ptr
)
95 def clock_class_priority_map(self
):
96 cc_prio_map_ptr
= native_bt
.notification_event_get_clock_class_priority_map(self
._ptr
)
97 assert(cc_prio_map_ptr
)
98 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
100 def __eq__(self
, other
):
101 if type(other
) is not type(self
):
104 if self
.addr
== other
.addr
:
109 self
.clock_class_priority_map
,
113 other
.clock_class_priority_map
,
115 return self_props
== other_props
117 def _copy(self
, copy_func
):
118 # We can always use references here because those properties are
119 # frozen anyway if they are part of a notification. Since the
120 # user cannot modify them after copying the notification, it's
121 # useless to copy/deep-copy them.
122 return EventNotification(self
.event
, self
.clock_class_priority_map
)
125 class PacketBeginningNotification(_CopyableNotification
):
126 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
128 def __init__(self
, packet
):
129 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
130 ptr
= native_bt
.notification_packet_begin_create(packet
._ptr
)
133 raise bt2
.CreationError('cannot create packet beginning notification object')
135 super().__init
__(ptr
)
139 packet_ptr
= native_bt
.notification_packet_begin_get_packet(self
._ptr
)
141 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
143 def __eq__(self
, other
):
144 if type(other
) is not type(self
):
147 if self
.addr
== other
.addr
:
150 return self
.packet
== other
.packet
152 def _copy(self
, copy_func
):
153 # We can always use references here because those properties are
154 # frozen anyway if they are part of a notification. Since the
155 # user cannot modify them after copying the notification, it's
156 # useless to copy/deep-copy them.
157 return PacketBeginningNotification(self
.packet
)
160 class PacketEndNotification(_CopyableNotification
):
161 _TYPE
= native_bt
.NOTIFICATION_TYPE_PACKET_END
163 def __init__(self
, packet
):
164 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
165 ptr
= native_bt
.notification_packet_end_create(packet
._ptr
)
168 raise bt2
.CreationError('cannot create packet end notification object')
170 super().__init
__(ptr
)
174 packet_ptr
= native_bt
.notification_packet_end_get_packet(self
._ptr
)
176 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
178 def __eq__(self
, other
):
179 if type(other
) is not type(self
):
182 if self
.addr
== other
.addr
:
185 return self
.packet
== other
.packet
187 def _copy(self
, copy_func
):
188 # We can always use references here because those properties are
189 # frozen anyway if they are part of a notification. Since the
190 # user cannot modify them after copying the notification, it's
191 # useless to copy/deep-copy them.
192 return PacketEndNotification(self
.packet
)
195 class StreamBeginningNotification(_CopyableNotification
):
196 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
198 def __init__(self
, stream
):
199 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
200 ptr
= native_bt
.notification_stream_begin_create(stream
._ptr
)
203 raise bt2
.CreationError('cannot create stream beginning notification object')
205 super().__init
__(ptr
)
209 stream_ptr
= native_bt
.notification_stream_begin_get_stream(self
._ptr
)
211 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
213 def __eq__(self
, other
):
214 if type(other
) is not type(self
):
217 if self
.addr
== other
.addr
:
220 return self
.stream
== other
.stream
222 def _copy(self
, copy_func
):
223 # We can always use references here because those properties are
224 # frozen anyway if they are part of a notification. Since the
225 # user cannot modify them after copying the notification, it's
226 # useless to copy/deep-copy them.
227 return StreamBeginningNotification(self
.stream
)
230 class StreamEndNotification(_CopyableNotification
):
231 _TYPE
= native_bt
.NOTIFICATION_TYPE_STREAM_END
233 def __init__(self
, stream
):
234 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
235 ptr
= native_bt
.notification_stream_end_create(stream
._ptr
)
238 raise bt2
.CreationError('cannot create stream end notification object')
240 super().__init
__(ptr
)
244 stream_ptr
= native_bt
.notification_stream_end_get_stream(self
._ptr
)
246 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
248 def __eq__(self
, other
):
249 if type(other
) is not type(self
):
252 if self
.addr
== other
.addr
:
255 return self
.stream
== other
.stream
257 def _copy(self
, copy_func
):
258 # We can always use references here because those properties are
259 # frozen anyway if they are part of a notification. Since the
260 # user cannot modify them after copying the notification, it's
261 # useless to copy/deep-copy them.
262 return StreamEndNotification(self
.stream
)
265 class InactivityNotification(_CopyableNotification
):
266 _TYPE
= native_bt
.NOTIFICATION_TYPE_INACTIVITY
268 def __init__(self
, cc_prio_map
=None):
269 if cc_prio_map
is not None:
270 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
271 cc_prio_map_ptr
= cc_prio_map
._ptr
273 cc_prio_map_ptr
= None
275 ptr
= native_bt
.notification_inactivity_create(cc_prio_map_ptr
)
278 raise bt2
.CreationError('cannot create inactivity notification object')
280 super().__init
__(ptr
)
283 def clock_class_priority_map(self
):
284 cc_prio_map_ptr
= native_bt
.notification_inactivity_get_clock_class_priority_map(self
._ptr
)
285 assert(cc_prio_map_ptr
)
286 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
288 def clock_value(self
, clock_class
):
289 utils
._check
_type
(clock_class
, bt2
.ClockClass
)
290 clock_value_ptr
= native_bt
.notification_inactivity_get_clock_value(self
._ptr
,
293 if clock_value_ptr
is None:
296 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
299 def add_clock_value(self
, clock_value
):
300 utils
._check
_type
(clock_value
, bt2
.clock_class
._ClockValue
)
301 ret
= native_bt
.notification_inactivity_set_clock_value(self
._ptr
,
303 utils
._handle
_ret
(ret
, "cannot set inactivity notification object's clock value")
305 def _get_clock_values(self
):
308 for clock_class
in self
.clock_class_priority_map
:
309 clock_value
= self
.clock_value(clock_class
)
311 if clock_value
is None:
314 clock_values
[clock_class
] = clock_value
318 def __eq__(self
, other
):
319 if type(other
) is not type(self
):
322 if self
.addr
== other
.addr
:
326 self
.clock_class_priority_map
,
327 self
._get
_clock
_values
(),
330 other
.clock_class_priority_map
,
331 other
._get
_clock
_values
(),
333 return self_props
== other_props
336 cpy
= InactivityNotification(self
.clock_class_priority_map
)
338 for clock_class
in self
.clock_class_priority_map
:
339 clock_value
= self
.clock_value(clock_class
)
341 if clock_value
is None:
344 cpy
.add_clock_value(clock_value
)
348 def __deepcopy__(self
, memo
):
349 cc_prio_map_cpy
= copy
.deepcopy(self
.clock_class_priority_map
)
350 cpy
= InactivityNotification(cc_prio_map_cpy
)
353 for orig_clock_class
in self
.clock_class_priority_map
:
354 orig_clock_value
= self
.clock_value(orig_clock_class
)
356 if orig_clock_value
is None:
359 # find equivalent, copied clock class in CC priority map copy
360 for cpy_clock_class
in cc_prio_map_cpy
:
361 if cpy_clock_class
== orig_clock_class
:
364 # create copy of clock value from copied clock class
365 clock_value_cpy
= cpy_clock_class(orig_clock_value
.cycles
)
367 # set copied clock value in notification copy
368 cpy
.add_clock_value(clock_value_cpy
)
374 class _DiscardedElementsNotification(_Notification
):
375 def __eq__(self
, other
):
376 if type(other
) is not type(self
):
379 if self
.addr
== other
.addr
:
385 self
.beginning_clock_value
,
386 self
.end_clock_value
,
391 other
.beginning_clock_value
,
392 other
.end_clock_value
,
394 return self_props
== other_props
397 class _DiscardedPacketsNotification(_DiscardedElementsNotification
):
398 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
402 count
= native_bt
.notification_discarded_packets_get_count(self
._ptr
)
408 stream_ptr
= native_bt
.notification_discarded_packets_get_stream(self
._ptr
)
410 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
413 def beginning_clock_value(self
):
414 clock_value_ptr
= native_bt
.notification_discarded_packets_get_begin_clock_value(self
._ptr
)
416 if clock_value_ptr
is None:
419 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
423 def end_clock_value(self
):
424 clock_value_ptr
= native_bt
.notification_discarded_packets_get_end_clock_value(self
._ptr
)
426 if clock_value_ptr
is None:
429 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
433 class _DiscardedEventsNotification(_DiscardedElementsNotification
):
434 _TYPE
= native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
438 count
= native_bt
.notification_discarded_events_get_count(self
._ptr
)
444 stream_ptr
= native_bt
.notification_discarded_events_get_stream(self
._ptr
)
446 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
449 def beginning_clock_value(self
):
450 clock_value_ptr
= native_bt
.notification_discarded_events_get_begin_clock_value(self
._ptr
)
452 if clock_value_ptr
is None:
455 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
459 def end_clock_value(self
):
460 clock_value_ptr
= native_bt
.notification_discarded_events_get_end_clock_value(self
._ptr
)
462 if clock_value_ptr
is None:
465 clock_value
= bt2
.clock_class
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
469 _NOTIF_TYPE_TO_CLS
= {
470 native_bt
.NOTIFICATION_TYPE_EVENT
: EventNotification
,
471 native_bt
.NOTIFICATION_TYPE_PACKET_BEGIN
: PacketBeginningNotification
,
472 native_bt
.NOTIFICATION_TYPE_PACKET_END
: PacketEndNotification
,
473 native_bt
.NOTIFICATION_TYPE_STREAM_BEGIN
: StreamBeginningNotification
,
474 native_bt
.NOTIFICATION_TYPE_STREAM_END
: StreamEndNotification
,
475 native_bt
.NOTIFICATION_TYPE_INACTIVITY
: InactivityNotification
,
476 native_bt
.NOTIFICATION_TYPE_DISCARDED_PACKETS
: _DiscardedPacketsNotification
,
477 native_bt
.NOTIFICATION_TYPE_DISCARDED_EVENTS
: _DiscardedEventsNotification
,