2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: LGPL-2.1-only
8 #include <lttng/notification/notification-internal.hpp>
9 #include <lttng/notification/channel-internal.hpp>
10 #include <lttng/condition/condition-internal.hpp>
11 #include <lttng/endpoint.h>
12 #include <common/defaults.hpp>
13 #include <common/error.hpp>
14 #include <common/dynamic-buffer.hpp>
15 #include <common/utils.hpp>
16 #include <common/defaults.hpp>
17 #include <common/payload.hpp>
18 #include <common/payload-view.hpp>
19 #include <common/unix.hpp>
20 #include "lttng-ctl-helper.hpp"
21 #include <common/compat/poll.hpp>
24 int handshake(struct lttng_notification_channel
*channel
);
27 * Populates the reception buffer with the next complete message.
28 * The caller must acquire the channel's lock.
31 int receive_message(struct lttng_notification_channel
*channel
)
34 struct lttng_notification_channel_message msg
;
36 lttng_payload_clear(&channel
->reception_payload
);
38 ret
= lttcomm_recv_unix_sock(channel
->socket
, &msg
, sizeof(msg
));
44 if (msg
.size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
49 /* Add message header at buffer's start. */
50 ret
= lttng_dynamic_buffer_append(&channel
->reception_payload
.buffer
, &msg
,
60 /* Reserve space for the payload. */
61 ret
= lttng_dynamic_buffer_set_size(&channel
->reception_payload
.buffer
,
62 channel
->reception_payload
.buffer
.size
+ msg
.size
);
67 /* Receive message payload. */
68 ret
= lttcomm_recv_unix_sock(channel
->socket
,
69 channel
->reception_payload
.buffer
.data
+ sizeof(msg
), msg
.size
);
70 if (ret
< (ssize_t
) msg
.size
) {
76 /* Receive message fds. */
78 ret
= lttcomm_recv_payload_fds_unix_sock(channel
->socket
,
79 msg
.fds
, &channel
->reception_payload
);
80 if (ret
< sizeof(int) * msg
.fds
) {
89 lttng_payload_clear(&channel
->reception_payload
);
94 enum lttng_notification_channel_message_type
get_current_message_type(
95 struct lttng_notification_channel
*channel
)
97 struct lttng_notification_channel_message
*msg
;
99 LTTNG_ASSERT(channel
->reception_payload
.buffer
.size
>= sizeof(*msg
));
101 msg
= (struct lttng_notification_channel_message
*)
102 channel
->reception_payload
.buffer
.data
;
103 return (enum lttng_notification_channel_message_type
) msg
->type
;
107 struct lttng_notification
*create_notification_from_current_message(
108 struct lttng_notification_channel
*channel
)
111 struct lttng_notification
*notification
= NULL
;
113 if (channel
->reception_payload
.buffer
.size
<=
114 sizeof(struct lttng_notification_channel_message
)) {
119 struct lttng_payload_view view
= lttng_payload_view_from_payload(
120 &channel
->reception_payload
,
121 sizeof(struct lttng_notification_channel_message
),
124 ret
= lttng_notification_create_from_payload(
125 &view
, ¬ification
);
128 if (ret
!= channel
->reception_payload
.buffer
.size
-
129 sizeof(struct lttng_notification_channel_message
)) {
130 lttng_notification_destroy(notification
);
138 struct lttng_notification_channel
*lttng_notification_channel_create(
139 struct lttng_endpoint
*endpoint
)
142 bool is_in_tracing_group
= false, is_root
= false;
143 char *sock_path
= NULL
;
144 struct lttng_notification_channel
*channel
= NULL
;
147 endpoint
!= lttng_session_daemon_notification_endpoint
) {
151 sock_path
= calloc
<char>(LTTNG_PATH_MAX
);
156 channel
= zmalloc
<lttng_notification_channel
>();
160 channel
->socket
= -1;
161 pthread_mutex_init(&channel
->lock
, NULL
);
162 lttng_payload_init(&channel
->reception_payload
);
163 CDS_INIT_LIST_HEAD(&channel
->pending_notifications
.list
);
165 is_root
= (getuid() == 0);
167 is_in_tracing_group
= lttng_check_tracing_group();
170 if (is_root
|| is_in_tracing_group
) {
171 ret
= lttng_strncpy(sock_path
,
172 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
,
175 ret
= -LTTNG_ERR_INVALID
;
179 ret
= lttcomm_connect_unix_sock(sock_path
);
186 /* Fallback to local session daemon. */
187 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
188 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
189 utils_get_home_dir());
190 if (ret
< 0 || ret
>= LTTNG_PATH_MAX
) {
194 ret
= lttcomm_connect_unix_sock(sock_path
);
201 channel
->socket
= fd
;
203 ret
= handshake(channel
);
211 lttng_notification_channel_destroy(channel
);
216 enum lttng_notification_channel_status
217 lttng_notification_channel_get_next_notification(
218 struct lttng_notification_channel
*channel
,
219 struct lttng_notification
**_notification
)
222 struct lttng_notification
*notification
= NULL
;
223 enum lttng_notification_channel_status status
=
224 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
225 struct lttng_poll_event events
;
227 if (!channel
|| !_notification
) {
228 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
232 pthread_mutex_lock(&channel
->lock
);
234 if (channel
->pending_notifications
.count
) {
235 struct pending_notification
*pending_notification
;
237 LTTNG_ASSERT(!cds_list_empty(&channel
->pending_notifications
.list
));
239 /* Deliver one of the pending notifications. */
240 pending_notification
= cds_list_first_entry(
241 &channel
->pending_notifications
.list
,
242 struct pending_notification
,
244 notification
= pending_notification
->notification
;
246 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
248 cds_list_del(&pending_notification
->node
);
249 channel
->pending_notifications
.count
--;
250 free(pending_notification
);
255 * Block on interruptible epoll/poll() instead of the message reception
256 * itself as the recvmsg() wrappers always restart on EINTR. We choose
257 * to wait using interruptible epoll/poll() in order to:
258 * 1) Return if a signal occurs,
259 * 2) Not deal with partially received messages.
261 * The drawback to this approach is that we assume that messages
262 * are complete/well formed. If a message is shorter than its
263 * announced length, receive_message() will block on recvmsg()
264 * and never return (even if a signal is received).
266 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
268 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
271 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
273 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
276 ret
= lttng_poll_wait_interruptible(&events
, -1);
278 status
= (ret
== -1 && errno
== EINTR
) ?
279 LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED
:
280 LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
284 ret
= receive_message(channel
);
286 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
290 switch (get_current_message_type(channel
)) {
291 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
292 notification
= create_notification_from_current_message(
295 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
299 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
300 /* No payload to consume. */
301 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
;
304 /* Protocol error. */
305 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
310 lttng_poll_clean(&events
);
312 pthread_mutex_unlock(&channel
->lock
);
313 *_notification
= notification
;
319 int enqueue_dropped_notification(
320 struct lttng_notification_channel
*channel
)
323 struct pending_notification
*pending_notification
;
324 struct cds_list_head
*last_element
=
325 channel
->pending_notifications
.list
.prev
;
327 pending_notification
= caa_container_of(last_element
,
328 struct pending_notification
, node
);
329 if (!pending_notification
->notification
) {
331 * The last enqueued notification indicates dropped
332 * notifications; there is nothing to do as we group
333 * dropped notifications together.
338 if (channel
->pending_notifications
.count
>=
339 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
&&
340 pending_notification
->notification
) {
342 * Discard the last enqueued notification to indicate
343 * that notifications were dropped at this point.
345 lttng_notification_destroy(
346 pending_notification
->notification
);
347 pending_notification
->notification
= NULL
;
351 pending_notification
= zmalloc
<struct pending_notification
>();
352 if (!pending_notification
) {
356 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
357 cds_list_add(&pending_notification
->node
,
358 &channel
->pending_notifications
.list
);
359 channel
->pending_notifications
.count
++;
365 int enqueue_notification_from_current_message(
366 struct lttng_notification_channel
*channel
)
369 struct lttng_notification
*notification
;
370 struct pending_notification
*pending_notification
;
372 if (channel
->pending_notifications
.count
>=
373 DEFAULT_CLIENT_MAX_QUEUED_NOTIFICATIONS_COUNT
) {
374 /* Drop the notification. */
375 ret
= enqueue_dropped_notification(channel
);
379 pending_notification
= zmalloc
<struct pending_notification
>();
380 if (!pending_notification
) {
384 CDS_INIT_LIST_HEAD(&pending_notification
->node
);
386 notification
= create_notification_from_current_message(channel
);
392 pending_notification
->notification
= notification
;
393 cds_list_add(&pending_notification
->node
,
394 &channel
->pending_notifications
.list
);
395 channel
->pending_notifications
.count
++;
399 free(pending_notification
);
403 enum lttng_notification_channel_status
404 lttng_notification_channel_has_pending_notification(
405 struct lttng_notification_channel
*channel
,
406 bool *_notification_pending
)
409 enum lttng_notification_channel_status status
=
410 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
411 struct lttng_poll_event events
;
413 if (!channel
|| !_notification_pending
) {
414 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
418 pthread_mutex_lock(&channel
->lock
);
420 if (channel
->pending_notifications
.count
) {
421 *_notification_pending
= true;
425 if (channel
->socket
< 0) {
426 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
;
431 * Check, without blocking, if data is available on the channel's
432 * socket. If there is data available, it is safe to read (blocking)
433 * on the socket for a message from the session daemon.
435 * Since all commands wait for the session daemon's reply before
436 * releasing the channel's lock, the protocol only allows for
437 * notifications and "notification dropped" messages to come
438 * through. If we receive a different message type, it is
439 * considered a protocol error.
441 * Note that this function is not guaranteed not to block. This
442 * will block until our peer (the session daemon) has sent a complete
443 * message if we see data available on the socket. If the peer does
444 * not respect the protocol, this may block indefinitely.
446 ret
= lttng_poll_create(&events
, 1, LTTNG_CLOEXEC
);
448 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
451 ret
= lttng_poll_add(&events
, channel
->socket
, LPOLLIN
| LPOLLERR
);
453 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
456 /* timeout = 0: return immediately. */
457 ret
= lttng_poll_wait_interruptible(&events
, 0);
459 /* No data available. */
460 *_notification_pending
= false;
462 } else if (ret
< 0) {
463 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
467 /* Data available on socket. */
468 ret
= receive_message(channel
);
470 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
474 switch (get_current_message_type(channel
)) {
475 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
476 ret
= enqueue_notification_from_current_message(channel
);
480 *_notification_pending
= true;
482 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
483 ret
= enqueue_dropped_notification(channel
);
487 *_notification_pending
= true;
490 /* Protocol error. */
491 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
496 lttng_poll_clean(&events
);
498 pthread_mutex_unlock(&channel
->lock
);
504 int receive_command_reply(struct lttng_notification_channel
*channel
,
505 enum lttng_notification_channel_status
*status
)
508 struct lttng_notification_channel_command_reply
*reply
;
511 enum lttng_notification_channel_message_type msg_type
;
513 ret
= receive_message(channel
);
518 msg_type
= get_current_message_type(channel
);
520 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
:
522 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
:
523 ret
= enqueue_notification_from_current_message(
529 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
:
530 ret
= enqueue_dropped_notification(channel
);
535 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
537 struct lttng_notification_channel_command_handshake
*handshake
;
539 handshake
= (struct lttng_notification_channel_command_handshake
*)
540 (channel
->reception_payload
.buffer
.data
+
541 sizeof(struct lttng_notification_channel_message
));
542 channel
->version
.major
= handshake
->major
;
543 channel
->version
.minor
= handshake
->minor
;
544 channel
->version
.set
= true;
554 if (channel
->reception_payload
.buffer
.size
<
555 (sizeof(struct lttng_notification_channel_message
) +
557 /* Invalid message received. */
562 reply
= (struct lttng_notification_channel_command_reply
*)
563 (channel
->reception_payload
.buffer
.data
+
564 sizeof(struct lttng_notification_channel_message
));
565 *status
= (enum lttng_notification_channel_status
) reply
->status
;
571 int handshake(struct lttng_notification_channel
*channel
)
574 enum lttng_notification_channel_status status
=
575 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
576 struct lttng_notification_channel_command_handshake handshake
= {
577 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
578 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
580 struct lttng_notification_channel_message msg_header
= {
581 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
582 .size
= sizeof(handshake
),
585 char send_buffer
[sizeof(msg_header
) + sizeof(handshake
)];
587 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
588 memcpy(send_buffer
+ sizeof(msg_header
), &handshake
, sizeof(handshake
));
590 pthread_mutex_lock(&channel
->lock
);
592 ret
= lttcomm_send_creds_unix_sock(channel
->socket
, send_buffer
,
593 sizeof(send_buffer
));
598 /* Receive handshake info from the sessiond. */
599 ret
= receive_command_reply(channel
, &status
);
604 if (!channel
->version
.set
) {
609 if (channel
->version
.major
!= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
615 pthread_mutex_unlock(&channel
->lock
);
620 enum lttng_notification_channel_status
send_condition_command(
621 struct lttng_notification_channel
*channel
,
622 enum lttng_notification_channel_message_type type
,
623 const struct lttng_condition
*condition
)
627 enum lttng_notification_channel_status status
=
628 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
629 struct lttng_payload payload
;
630 struct lttng_notification_channel_message cmd_header
= {
631 .type
= (int8_t) type
,
636 lttng_payload_init(&payload
);
639 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
643 LTTNG_ASSERT(type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
||
644 type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
);
646 pthread_mutex_lock(&channel
->lock
);
647 socket
= channel
->socket
;
649 if (!lttng_condition_validate(condition
)) {
650 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
654 ret
= lttng_dynamic_buffer_append(&payload
.buffer
, &cmd_header
,
657 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
661 ret
= lttng_condition_serialize(condition
, &payload
);
663 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID
;
667 /* Update payload length. */
668 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->size
=
669 (uint32_t) (payload
.buffer
.size
- sizeof(cmd_header
));
672 struct lttng_payload_view pv
=
673 lttng_payload_view_from_payload(
676 lttng_payload_view_get_fd_handle_count(&pv
);
678 /* Update fd count. */
679 ((struct lttng_notification_channel_message
*) payload
.buffer
.data
)->fds
=
682 ret
= lttcomm_send_unix_sock(
683 socket
, pv
.buffer
.data
, pv
.buffer
.size
);
685 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
689 /* Pass fds if present. */
691 ret
= lttcomm_send_payload_view_fds_unix_sock(socket
,
694 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
700 ret
= receive_command_reply(channel
, &status
);
702 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
706 pthread_mutex_unlock(&channel
->lock
);
708 lttng_payload_reset(&payload
);
712 enum lttng_notification_channel_status
lttng_notification_channel_subscribe(
713 struct lttng_notification_channel
*channel
,
714 const struct lttng_condition
*condition
)
716 return send_condition_command(channel
,
717 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
,
721 enum lttng_notification_channel_status
lttng_notification_channel_unsubscribe(
722 struct lttng_notification_channel
*channel
,
723 const struct lttng_condition
*condition
)
725 return send_condition_command(channel
,
726 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
,
730 void lttng_notification_channel_destroy(
731 struct lttng_notification_channel
*channel
)
737 if (channel
->socket
>= 0) {
738 (void) lttcomm_close_unix_sock(channel
->socket
);
740 pthread_mutex_destroy(&channel
->lock
);
741 lttng_payload_reset(&channel
->reception_payload
);