2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
44 enum lttng_object_type
{
45 LTTNG_OBJECT_TYPE_UNKNOWN
,
46 LTTNG_OBJECT_TYPE_NONE
,
47 LTTNG_OBJECT_TYPE_CHANNEL
,
48 LTTNG_OBJECT_TYPE_SESSION
,
51 struct lttng_trigger_list_element
{
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger
*trigger
;
54 struct cds_list_head node
;
57 struct lttng_channel_trigger_list
{
58 struct channel_key channel_key
;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list
;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node
;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node
;
68 * List of triggers applying to a given session.
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
76 struct lttng_session_trigger_list
{
78 * Not owned by this; points to the session_info structure's
81 const char *session_name
;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list
;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node
;
87 * Weak reference to the notification system's session triggers
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
103 struct cds_lfht
*session_triggers_ht
;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node
;
108 struct lttng_trigger_ht_element
{
109 struct lttng_trigger
*trigger
;
110 struct cds_lfht_node node
;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node
;
115 struct lttng_condition_list_element
{
116 struct lttng_condition
*condition
;
117 struct cds_list_head node
;
120 struct notification_client_list_element
{
121 struct notification_client
*client
;
122 struct cds_list_head node
;
126 * Thread safety of notification_client and notification_client_list.
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
144 struct notification_client_list
{
145 pthread_mutex_t lock
;
147 const struct lttng_trigger
*trigger
;
148 struct cds_list_head list
;
149 /* Weak reference to container. */
150 struct cds_lfht
*notification_trigger_clients_ht
;
151 struct cds_lfht_node notification_trigger_clients_ht_node
;
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node
;
156 struct notification_client
{
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock
;
159 notification_client_id id
;
161 /* Client protocol version. */
162 uint8_t major
, minor
;
166 * Indicates if the credentials and versions of the client have been
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
175 struct cds_list_head condition_list
;
176 struct cds_lfht_node client_socket_ht_node
;
177 struct cds_lfht_node client_id_ht_node
;
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
193 struct lttng_dynamic_buffer buffer
;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive
;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type
;
199 * Indicates whether or not credentials are expected
204 * Indicates whether or not credentials were received
208 /* Only used during credentials reception. */
209 lttng_sock_cred creds
;
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
217 * A notification is dropped whenever the buffer is not
220 bool dropped_notification
;
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
228 bool queued_command_reply
;
229 struct lttng_dynamic_buffer buffer
;
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node
;
236 struct channel_state_sample
{
237 struct channel_key key
;
238 struct cds_lfht_node channel_state_ht_node
;
239 uint64_t highest_usage
;
240 uint64_t lowest_usage
;
241 uint64_t channel_total_consumed
;
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node
;
246 static unsigned long hash_channel_key(struct channel_key
*key
);
247 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
248 struct lttng_evaluation
**evaluation
,
249 const struct notification_thread_state
*state
,
250 const struct channel_state_sample
*previous_sample
,
251 const struct channel_state_sample
*latest_sample
,
252 uint64_t previous_session_consumed_total
,
253 uint64_t latest_session_consumed_total
,
254 struct channel_info
*channel_info
);
256 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
257 const struct lttng_evaluation
*evaluation
,
258 struct notification_client_list
*client_list
,
259 struct notification_thread_state
*state
,
260 uid_t channel_uid
, gid_t channel_gid
);
263 /* session_info API */
265 void session_info_destroy(void *_data
);
267 void session_info_get(struct session_info
*session_info
);
269 void session_info_put(struct session_info
*session_info
);
271 struct session_info
*session_info_create(const char *name
,
272 uid_t uid
, gid_t gid
,
273 struct lttng_session_trigger_list
*trigger_list
,
274 struct cds_lfht
*sessions_ht
);
276 void session_info_add_channel(struct session_info
*session_info
,
277 struct channel_info
*channel_info
);
279 void session_info_remove_channel(struct session_info
*session_info
,
280 struct channel_info
*channel_info
);
282 /* lttng_session_trigger_list API */
284 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
285 const char *session_name
,
286 struct cds_lfht
*session_triggers_ht
);
288 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
289 const struct notification_thread_state
*state
,
290 const char *session_name
);
292 void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list
*list
);
295 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
296 const struct lttng_trigger
*trigger
);
300 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
302 /* This double-cast is intended to supress pointer-to-cast warning. */
303 const int socket
= (int) (intptr_t) key
;
304 const struct notification_client
*client
= caa_container_of(node
,
305 struct notification_client
, client_socket_ht_node
);
307 return client
->socket
== socket
;
311 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id
= *((notification_client_id
*) key
);
315 const struct notification_client
*client
= caa_container_of(
316 node
, struct notification_client
, client_id_ht_node
);
318 return client
->id
== id
;
322 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
324 struct channel_key
*channel_key
= (struct channel_key
*) key
;
325 struct lttng_channel_trigger_list
*trigger_list
;
327 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
328 channel_triggers_ht_node
);
330 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
331 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
335 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
337 const char *session_name
= (const char *) key
;
338 struct lttng_session_trigger_list
*trigger_list
;
340 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
341 session_triggers_ht_node
);
343 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
347 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
349 struct channel_key
*channel_key
= (struct channel_key
*) key
;
350 struct channel_state_sample
*sample
;
352 sample
= caa_container_of(node
, struct channel_state_sample
,
353 channel_state_ht_node
);
355 return !!((channel_key
->key
== sample
->key
.key
) &&
356 (channel_key
->domain
== sample
->key
.domain
));
360 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
362 struct channel_key
*channel_key
= (struct channel_key
*) key
;
363 struct channel_info
*channel_info
;
365 channel_info
= caa_container_of(node
, struct channel_info
,
368 return !!((channel_key
->key
== channel_info
->key
.key
) &&
369 (channel_key
->domain
== channel_info
->key
.domain
));
373 int match_condition(struct cds_lfht_node
*node
, const void *key
)
375 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
376 struct lttng_trigger_ht_element
*trigger
;
377 struct lttng_condition
*condition
;
379 trigger
= caa_container_of(node
, struct lttng_trigger_ht_element
,
381 condition
= lttng_trigger_get_condition(trigger
->trigger
);
384 return !!lttng_condition_is_equal(condition_key
, condition
);
388 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
390 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
391 struct notification_client_list
*client_list
;
392 const struct lttng_condition
*condition
;
394 assert(condition_key
);
396 client_list
= caa_container_of(node
, struct notification_client_list
,
397 notification_trigger_clients_ht_node
);
398 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
400 return !!lttng_condition_is_equal(condition_key
, condition
);
404 int match_session(struct cds_lfht_node
*node
, const void *key
)
406 const char *name
= key
;
407 struct session_info
*session_info
= caa_container_of(
408 node
, struct session_info
, sessions_ht_node
);
410 return !strcmp(session_info
->name
, name
);
414 unsigned long lttng_condition_buffer_usage_hash(
415 const struct lttng_condition
*_condition
)
418 unsigned long condition_type
;
419 struct lttng_condition_buffer_usage
*condition
;
421 condition
= container_of(_condition
,
422 struct lttng_condition_buffer_usage
, parent
);
424 condition_type
= (unsigned long) condition
->parent
.type
;
425 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
426 if (condition
->session_name
) {
427 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
429 if (condition
->channel_name
) {
430 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
432 if (condition
->domain
.set
) {
433 hash
^= hash_key_ulong(
434 (void *) condition
->domain
.type
,
437 if (condition
->threshold_ratio
.set
) {
440 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
441 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
442 } else if (condition
->threshold_bytes
.set
) {
445 val
= condition
->threshold_bytes
.value
;
446 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
452 unsigned long lttng_condition_session_consumed_size_hash(
453 const struct lttng_condition
*_condition
)
456 unsigned long condition_type
=
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
458 struct lttng_condition_session_consumed_size
*condition
;
461 condition
= container_of(_condition
,
462 struct lttng_condition_session_consumed_size
, parent
);
464 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
465 if (condition
->session_name
) {
466 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
468 val
= condition
->consumed_threshold_bytes
.value
;
469 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
474 unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition
*_condition
)
477 unsigned long hash
, condition_type
;
478 struct lttng_condition_session_rotation
*condition
;
480 condition
= container_of(_condition
,
481 struct lttng_condition_session_rotation
, parent
);
482 condition_type
= (unsigned long) condition
->parent
.type
;
483 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
484 assert(condition
->session_name
);
485 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
495 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
497 switch (condition
->type
) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
500 return lttng_condition_buffer_usage_hash(condition
);
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
502 return lttng_condition_session_consumed_size_hash(condition
);
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
505 return lttng_condition_session_rotation_hash(condition
);
507 ERR("[notification-thread] Unexpected condition type caught");
513 unsigned long hash_channel_key(struct channel_key
*key
)
515 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
516 unsigned long domain_hash
= hash_key_ulong(
517 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
519 return key_hash
^ domain_hash
;
523 unsigned long hash_client_socket(int socket
)
525 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
529 unsigned long hash_client_id(notification_client_id id
)
531 return hash_key_u64(&id
, lttng_ht_seed
);
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
543 enum lttng_object_type
get_condition_binding_object(
544 const struct lttng_condition
*condition
)
546 switch (lttng_condition_get_type(condition
)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
550 return LTTNG_OBJECT_TYPE_CHANNEL
;
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
553 return LTTNG_OBJECT_TYPE_SESSION
;
555 return LTTNG_OBJECT_TYPE_UNKNOWN
;
560 void free_channel_info_rcu(struct rcu_head
*node
)
562 free(caa_container_of(node
, struct channel_info
, rcu_node
));
566 void channel_info_destroy(struct channel_info
*channel_info
)
572 if (channel_info
->session_info
) {
573 session_info_remove_channel(channel_info
->session_info
,
575 session_info_put(channel_info
->session_info
);
577 if (channel_info
->name
) {
578 free(channel_info
->name
);
580 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
584 void free_session_info_rcu(struct rcu_head
*node
)
586 free(caa_container_of(node
, struct session_info
, rcu_node
));
589 /* Don't call directly, use the ref-counting mechanism. */
591 void session_info_destroy(void *_data
)
593 struct session_info
*session_info
= _data
;
596 assert(session_info
);
597 if (session_info
->channel_infos_ht
) {
598 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
600 ERR("[notification-thread] Failed to destroy channel information hash table");
603 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
606 cds_lfht_del(session_info
->sessions_ht
,
607 &session_info
->sessions_ht_node
);
609 free(session_info
->name
);
610 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
614 void session_info_get(struct session_info
*session_info
)
619 lttng_ref_get(&session_info
->ref
);
623 void session_info_put(struct session_info
*session_info
)
628 lttng_ref_put(&session_info
->ref
);
632 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
633 struct lttng_session_trigger_list
*trigger_list
,
634 struct cds_lfht
*sessions_ht
)
636 struct session_info
*session_info
;
640 session_info
= zmalloc(sizeof(*session_info
));
644 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
646 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
647 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
648 if (!session_info
->channel_infos_ht
) {
652 cds_lfht_node_init(&session_info
->sessions_ht_node
);
653 session_info
->name
= strdup(name
);
654 if (!session_info
->name
) {
657 session_info
->uid
= uid
;
658 session_info
->gid
= gid
;
659 session_info
->trigger_list
= trigger_list
;
660 session_info
->sessions_ht
= sessions_ht
;
664 session_info_put(session_info
);
669 void session_info_add_channel(struct session_info
*session_info
,
670 struct channel_info
*channel_info
)
673 cds_lfht_add(session_info
->channel_infos_ht
,
674 hash_channel_key(&channel_info
->key
),
675 &channel_info
->session_info_channels_ht_node
);
680 void session_info_remove_channel(struct session_info
*session_info
,
681 struct channel_info
*channel_info
)
684 cds_lfht_del(session_info
->channel_infos_ht
,
685 &channel_info
->session_info_channels_ht_node
);
690 struct channel_info
*channel_info_create(const char *channel_name
,
691 struct channel_key
*channel_key
, uint64_t channel_capacity
,
692 struct session_info
*session_info
)
694 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
700 cds_lfht_node_init(&channel_info
->channels_ht_node
);
701 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
702 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
703 channel_info
->capacity
= channel_capacity
;
705 channel_info
->name
= strdup(channel_name
);
706 if (!channel_info
->name
) {
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
715 session_info_get(session_info
);
716 session_info_add_channel(session_info
, channel_info
);
717 channel_info
->session_info
= session_info
;
721 channel_info_destroy(channel_info
);
726 bool notification_client_list_get(struct notification_client_list
*list
)
728 return urcu_ref_get_unless_zero(&list
->ref
);
732 void free_notification_client_list_rcu(struct rcu_head
*node
)
734 free(caa_container_of(node
, struct notification_client_list
,
739 void notification_client_list_release(struct urcu_ref
*list_ref
)
741 struct notification_client_list
*list
=
742 container_of(list_ref
, typeof(*list
), ref
);
743 struct notification_client_list_element
*client_list_element
, *tmp
;
745 if (list
->notification_trigger_clients_ht
) {
747 cds_lfht_del(list
->notification_trigger_clients_ht
,
748 &list
->notification_trigger_clients_ht_node
);
750 list
->notification_trigger_clients_ht
= NULL
;
752 cds_list_for_each_entry_safe(client_list_element
, tmp
,
754 free(client_list_element
);
756 pthread_mutex_destroy(&list
->lock
);
757 call_rcu(&list
->rcu_node
, free_notification_client_list_rcu
);
761 struct notification_client_list
*notification_client_list_create(
762 const struct lttng_trigger
*trigger
)
764 struct notification_client_list
*client_list
=
765 zmalloc(sizeof(*client_list
));
770 pthread_mutex_init(&client_list
->lock
, NULL
);
771 urcu_ref_init(&client_list
->ref
);
772 cds_lfht_node_init(&client_list
->notification_trigger_clients_ht_node
);
773 CDS_INIT_LIST_HEAD(&client_list
->list
);
774 client_list
->trigger
= trigger
;
780 void publish_notification_client_list(
781 struct notification_thread_state
*state
,
782 struct notification_client_list
*list
)
784 const struct lttng_condition
*condition
=
785 lttng_trigger_get_const_condition(list
->trigger
);
787 assert(!list
->notification_trigger_clients_ht
);
789 list
->notification_trigger_clients_ht
=
790 state
->notification_trigger_clients_ht
;
793 cds_lfht_add(state
->notification_trigger_clients_ht
,
794 lttng_condition_hash(condition
),
795 &list
->notification_trigger_clients_ht_node
);
800 void notification_client_list_put(struct notification_client_list
*list
)
805 return urcu_ref_put(&list
->ref
, notification_client_list_release
);
808 /* Provides a reference to the returned list. */
810 struct notification_client_list
*get_client_list_from_condition(
811 struct notification_thread_state
*state
,
812 const struct lttng_condition
*condition
)
814 struct cds_lfht_node
*node
;
815 struct cds_lfht_iter iter
;
816 struct notification_client_list
*list
= NULL
;
819 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
820 lttng_condition_hash(condition
),
821 match_client_list_condition
,
824 node
= cds_lfht_iter_get_node(&iter
);
826 list
= container_of(node
, struct notification_client_list
,
827 notification_trigger_clients_ht_node
);
828 list
= notification_client_list_get(list
) ? list
: NULL
;
836 int evaluate_channel_condition_for_client(
837 const struct lttng_condition
*condition
,
838 struct notification_thread_state
*state
,
839 struct lttng_evaluation
**evaluation
,
840 uid_t
*session_uid
, gid_t
*session_gid
)
843 struct cds_lfht_iter iter
;
844 struct cds_lfht_node
*node
;
845 struct channel_info
*channel_info
= NULL
;
846 struct channel_key
*channel_key
= NULL
;
847 struct channel_state_sample
*last_sample
= NULL
;
848 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
852 /* Find the channel associated with the condition. */
853 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
854 channel_trigger_list
, channel_triggers_ht_node
) {
855 struct lttng_trigger_list_element
*element
;
857 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
858 const struct lttng_condition
*current_condition
=
859 lttng_trigger_get_const_condition(
862 assert(current_condition
);
863 if (!lttng_condition_is_equal(condition
,
864 current_condition
)) {
868 /* Found the trigger, save the channel key. */
869 channel_key
= &channel_trigger_list
->channel_key
;
873 /* The channel key was found stop iteration. */
879 /* No channel found; normal exit. */
880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state
->channels_ht
,
887 hash_channel_key(channel_key
),
891 node
= cds_lfht_iter_get_node(&iter
);
893 channel_info
= caa_container_of(node
, struct channel_info
,
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state
->channel_state_ht
,
898 hash_channel_key(channel_key
),
899 match_channel_state_sample
,
902 node
= cds_lfht_iter_get_node(&iter
);
904 last_sample
= caa_container_of(node
,
905 struct channel_state_sample
,
906 channel_state_ht_node
);
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
914 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
916 0, channel_info
->session_info
->consumed_data_size
,
919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
923 *session_uid
= channel_info
->session_info
->uid
;
924 *session_gid
= channel_info
->session_info
->gid
;
931 const char *get_condition_session_name(const struct lttng_condition
*condition
)
933 const char *session_name
= NULL
;
934 enum lttng_condition_status status
;
936 switch (lttng_condition_get_type(condition
)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
939 status
= lttng_condition_buffer_usage_get_session_name(
940 condition
, &session_name
);
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
943 status
= lttng_condition_session_consumed_size_get_session_name(
944 condition
, &session_name
);
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
948 status
= lttng_condition_session_rotation_get_session_name(
949 condition
, &session_name
);
954 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
963 int evaluate_session_condition_for_client(
964 const struct lttng_condition
*condition
,
965 struct notification_thread_state
*state
,
966 struct lttng_evaluation
**evaluation
,
967 uid_t
*session_uid
, gid_t
*session_gid
)
970 struct cds_lfht_iter iter
;
971 struct cds_lfht_node
*node
;
972 const char *session_name
;
973 struct session_info
*session_info
= NULL
;
976 session_name
= get_condition_session_name(condition
);
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state
->sessions_ht
,
980 hash_key_str(session_name
, lttng_ht_seed
),
984 node
= cds_lfht_iter_get_node(&iter
);
986 DBG("[notification-thread] No known session matching name \"%s\"",
992 session_info
= caa_container_of(node
, struct session_info
,
994 session_info_get(session_info
);
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
1000 switch (lttng_condition_get_type(condition
)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1002 if (!session_info
->rotation
.ongoing
) {
1004 goto end_session_put
;
1007 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1008 session_info
->rotation
.id
);
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info
->name
);
1014 goto end_session_put
;
1020 goto end_session_put
;
1023 *session_uid
= session_info
->uid
;
1024 *session_gid
= session_info
->gid
;
1027 session_info_put(session_info
);
1034 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
1035 const struct lttng_condition
*condition
,
1036 struct notification_client
*client
,
1037 struct notification_thread_state
*state
)
1040 struct lttng_evaluation
*evaluation
= NULL
;
1041 struct notification_client_list client_list
= {
1042 .lock
= PTHREAD_MUTEX_INITIALIZER
,
1044 struct notification_client_list_element client_list_element
= { 0 };
1045 uid_t object_uid
= 0;
1046 gid_t object_gid
= 0;
1053 switch (get_condition_binding_object(condition
)) {
1054 case LTTNG_OBJECT_TYPE_SESSION
:
1055 ret
= evaluate_session_condition_for_client(condition
, state
,
1056 &evaluation
, &object_uid
, &object_gid
);
1058 case LTTNG_OBJECT_TYPE_CHANNEL
:
1059 ret
= evaluate_channel_condition_for_client(condition
, state
,
1060 &evaluation
, &object_uid
, &object_gid
);
1062 case LTTNG_OBJECT_TYPE_NONE
:
1065 case LTTNG_OBJECT_TYPE_UNKNOWN
:
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1082 * Create a temporary client list with the client currently
1085 cds_lfht_node_init(&client_list
.notification_trigger_clients_ht_node
);
1086 CDS_INIT_LIST_HEAD(&client_list
.list
);
1087 client_list
.trigger
= trigger
;
1089 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
1090 client_list_element
.client
= client
;
1091 cds_list_add(&client_list_element
.node
, &client_list
.list
);
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
1096 state
, object_uid
, object_gid
);
1103 int notification_thread_client_subscribe(struct notification_client
*client
,
1104 struct lttng_condition
*condition
,
1105 struct notification_thread_state
*state
,
1106 enum lttng_notification_channel_status
*_status
)
1109 struct notification_client_list
*client_list
= NULL
;
1110 struct lttng_condition_list_element
*condition_list_element
= NULL
;
1111 struct notification_client_list_element
*client_list_element
= NULL
;
1112 enum lttng_notification_channel_status status
=
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1116 * Ensure that the client has not already subscribed to this condition
1119 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1120 if (lttng_condition_is_equal(condition_list_element
->condition
,
1122 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1127 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1128 if (!condition_list_element
) {
1132 client_list_element
= zmalloc(sizeof(*client_list_element
));
1133 if (!client_list_element
) {
1139 * Add the newly-subscribed condition to the client's subscription list.
1141 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1142 condition_list_element
->condition
= condition
;
1143 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1145 client_list
= get_client_list_from_condition(state
, condition
);
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1152 free(client_list_element
);
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
1165 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1169 free(client_list_element
);
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1178 client_list_element
->client
= client
;
1179 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1181 pthread_mutex_lock(&client_list
->lock
);
1182 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1183 pthread_mutex_unlock(&client_list
->lock
);
1189 notification_client_list_put(client_list
);
1193 free(condition_list_element
);
1194 free(client_list_element
);
1199 int notification_thread_client_unsubscribe(
1200 struct notification_client
*client
,
1201 struct lttng_condition
*condition
,
1202 struct notification_thread_state
*state
,
1203 enum lttng_notification_channel_status
*_status
)
1205 struct notification_client_list
*client_list
;
1206 struct lttng_condition_list_element
*condition_list_element
,
1208 struct notification_client_list_element
*client_list_element
,
1210 bool condition_found
= false;
1211 enum lttng_notification_channel_status status
=
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1216 &client
->condition_list
, node
) {
1217 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1222 cds_list_del(&condition_list_element
->node
);
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1228 if (condition
!= condition_list_element
->condition
) {
1229 lttng_condition_destroy(
1230 condition_list_element
->condition
);
1232 free(condition_list_element
);
1233 condition_found
= true;
1237 if (!condition_found
) {
1238 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1246 client_list
= get_client_list_from_condition(state
, condition
);
1251 pthread_mutex_lock(&client_list
->lock
);
1252 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1253 &client_list
->list
, node
) {
1254 if (client_list_element
->client
->id
!= client
->id
) {
1257 cds_list_del(&client_list_element
->node
);
1258 free(client_list_element
);
1261 pthread_mutex_unlock(&client_list
->lock
);
1262 notification_client_list_put(client_list
);
1265 lttng_condition_destroy(condition
);
1273 void free_notification_client_rcu(struct rcu_head
*node
)
1275 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1279 void notification_client_destroy(struct notification_client
*client
,
1280 struct notification_thread_state
*state
)
1287 * The client object is not reachable by other threads, no need to lock
1290 if (client
->socket
>= 0) {
1291 (void) lttcomm_close_unix_sock(client
->socket
);
1292 client
->socket
= -1;
1294 client
->communication
.active
= false;
1295 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1296 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1297 pthread_mutex_destroy(&client
->lock
);
1298 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1306 struct notification_client
*get_client_from_socket(int socket
,
1307 struct notification_thread_state
*state
)
1309 struct cds_lfht_iter iter
;
1310 struct cds_lfht_node
*node
;
1311 struct notification_client
*client
= NULL
;
1313 cds_lfht_lookup(state
->client_socket_ht
,
1314 hash_client_socket(socket
),
1315 match_client_socket
,
1316 (void *) (unsigned long) socket
,
1318 node
= cds_lfht_iter_get_node(&iter
);
1323 client
= caa_container_of(node
, struct notification_client
,
1324 client_socket_ht_node
);
1330 bool buffer_usage_condition_applies_to_channel(
1331 const struct lttng_condition
*condition
,
1332 const struct channel_info
*channel_info
)
1334 enum lttng_condition_status status
;
1335 enum lttng_domain_type condition_domain
;
1336 const char *condition_session_name
= NULL
;
1337 const char *condition_channel_name
= NULL
;
1339 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1341 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1342 if (channel_info
->key
.domain
!= condition_domain
) {
1346 status
= lttng_condition_buffer_usage_get_session_name(
1347 condition
, &condition_session_name
);
1348 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1350 status
= lttng_condition_buffer_usage_get_channel_name(
1351 condition
, &condition_channel_name
);
1352 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1354 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1357 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1367 bool session_consumed_size_condition_applies_to_channel(
1368 const struct lttng_condition
*condition
,
1369 const struct channel_info
*channel_info
)
1371 enum lttng_condition_status status
;
1372 const char *condition_session_name
= NULL
;
1374 status
= lttng_condition_session_consumed_size_get_session_name(
1375 condition
, &condition_session_name
);
1376 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1378 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1388 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1389 const struct channel_info
*channel_info
)
1391 const struct lttng_condition
*condition
;
1392 bool trigger_applies
;
1394 condition
= lttng_trigger_get_const_condition(trigger
);
1399 switch (lttng_condition_get_type(condition
)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1402 trigger_applies
= buffer_usage_condition_applies_to_channel(
1403 condition
, channel_info
);
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1406 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1407 condition
, channel_info
);
1413 return trigger_applies
;
1419 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1420 struct notification_client
*client
)
1422 bool applies
= false;
1423 struct lttng_condition_list_element
*condition_list_element
;
1425 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1427 applies
= lttng_condition_is_equal(
1428 condition_list_element
->condition
,
1429 lttng_trigger_get_condition(trigger
));
1437 /* Must be called with RCU read lock held. */
1439 struct lttng_session_trigger_list
*get_session_trigger_list(
1440 struct notification_thread_state
*state
,
1441 const char *session_name
)
1443 struct lttng_session_trigger_list
*list
= NULL
;
1444 struct cds_lfht_node
*node
;
1445 struct cds_lfht_iter iter
;
1447 cds_lfht_lookup(state
->session_triggers_ht
,
1448 hash_key_str(session_name
, lttng_ht_seed
),
1449 match_session_trigger_list
,
1452 node
= cds_lfht_iter_get_node(&iter
);
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1463 list
= caa_container_of(node
,
1464 struct lttng_session_trigger_list
,
1465 session_triggers_ht_node
);
1471 * Allocate an empty lttng_session_trigger_list for the session named
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1479 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1480 const char *session_name
,
1481 struct cds_lfht
*session_triggers_ht
)
1483 struct lttng_session_trigger_list
*list
;
1485 list
= zmalloc(sizeof(*list
));
1489 list
->session_name
= session_name
;
1490 CDS_INIT_LIST_HEAD(&list
->list
);
1491 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1492 list
->session_triggers_ht
= session_triggers_ht
;
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht
,
1497 hash_key_str(session_name
, lttng_ht_seed
),
1498 &list
->session_triggers_ht_node
);
1505 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1507 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1512 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1514 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1518 &list
->list
, node
) {
1519 cds_list_del(&trigger_list_element
->node
);
1520 free(trigger_list_element
);
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list
->session_triggers_ht
,
1525 &list
->session_triggers_ht_node
);
1527 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1531 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1532 const struct lttng_trigger
*trigger
)
1535 struct lttng_trigger_list_element
*new_element
=
1536 zmalloc(sizeof(*new_element
));
1542 CDS_INIT_LIST_HEAD(&new_element
->node
);
1543 new_element
->trigger
= trigger
;
1544 cds_list_add(&new_element
->node
, &list
->list
);
1550 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1551 const char *session_name
)
1553 bool applies
= false;
1554 const struct lttng_condition
*condition
;
1556 condition
= lttng_trigger_get_const_condition(trigger
);
1557 switch (lttng_condition_get_type(condition
)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1561 enum lttng_condition_status condition_status
;
1562 const char *condition_session_name
;
1564 condition_status
= lttng_condition_session_rotation_get_session_name(
1565 condition
, &condition_session_name
);
1566 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1571 assert(condition_session_name
);
1572 applies
= !strcmp(condition_session_name
, session_name
);
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1591 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1592 const struct notification_thread_state
*state
,
1593 const char *session_name
)
1595 int trigger_count
= 0;
1596 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1597 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1598 struct cds_lfht_iter iter
;
1600 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1601 state
->session_triggers_ht
);
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1608 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1613 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1614 trigger_ht_element
->trigger
);
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1624 return session_trigger_list
;
1626 lttng_session_trigger_list_destroy(session_trigger_list
);
1631 struct session_info
*find_or_create_session_info(
1632 struct notification_thread_state
*state
,
1633 const char *name
, uid_t uid
, gid_t gid
)
1635 struct session_info
*session
= NULL
;
1636 struct cds_lfht_node
*node
;
1637 struct cds_lfht_iter iter
;
1638 struct lttng_session_trigger_list
*trigger_list
;
1641 cds_lfht_lookup(state
->sessions_ht
,
1642 hash_key_str(name
, lttng_ht_seed
),
1646 node
= cds_lfht_iter_get_node(&iter
);
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1650 session
= caa_container_of(node
, struct session_info
,
1652 assert(session
->uid
== uid
);
1653 assert(session
->gid
== gid
);
1654 session_info_get(session
);
1658 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1659 if (!trigger_list
) {
1663 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1664 state
->sessions_ht
);
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1668 lttng_session_trigger_list_destroy(trigger_list
);
1671 trigger_list
= NULL
;
1673 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1674 &session
->sessions_ht_node
);
1680 session_info_put(session
);
1685 int handle_notification_thread_command_add_channel(
1686 struct notification_thread_state
*state
,
1687 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1688 const char *channel_name
, enum lttng_domain_type channel_domain
,
1689 uint64_t channel_key_int
, uint64_t channel_capacity
,
1690 enum lttng_error_code
*cmd_result
)
1692 struct cds_list_head trigger_list
;
1693 struct channel_info
*new_channel_info
= NULL
;
1694 struct channel_key channel_key
= {
1695 .key
= channel_key_int
,
1696 .domain
= channel_domain
,
1698 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1699 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1700 int trigger_count
= 0;
1701 struct cds_lfht_iter iter
;
1702 struct session_info
*session_info
= NULL
;
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1705 channel_name
, session_name
, channel_key_int
,
1706 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1708 CDS_INIT_LIST_HEAD(&trigger_list
);
1710 session_info
= find_or_create_session_info(state
, session_name
,
1711 session_uid
, session_gid
);
1712 if (!session_info
) {
1713 /* Allocation error or an internal error occurred. */
1717 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1718 channel_capacity
, session_info
);
1719 if (!new_channel_info
) {
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1727 struct lttng_trigger_list_element
*new_element
;
1729 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1730 new_channel_info
)) {
1734 new_element
= zmalloc(sizeof(*new_element
));
1739 CDS_INIT_LIST_HEAD(&new_element
->node
);
1740 new_element
->trigger
= trigger_ht_element
->trigger
;
1741 cds_list_add(&new_element
->node
, &trigger_list
);
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1748 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1749 if (!channel_trigger_list
) {
1752 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1754 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1755 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state
->channels_ht
,
1760 hash_channel_key(&new_channel_info
->key
),
1761 &new_channel_info
->channels_ht_node
);
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1766 cds_lfht_add(state
->channel_triggers_ht
,
1767 hash_channel_key(&new_channel_info
->key
),
1768 &channel_trigger_list
->channel_triggers_ht_node
);
1770 session_info_put(session_info
);
1771 *cmd_result
= LTTNG_OK
;
1774 channel_info_destroy(new_channel_info
);
1775 session_info_put(session_info
);
1780 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1782 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1787 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1789 free(caa_container_of(node
, struct channel_state_sample
,
1794 int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state
*state
,
1796 uint64_t channel_key
, enum lttng_domain_type domain
,
1797 enum lttng_error_code
*cmd_result
)
1799 struct cds_lfht_node
*node
;
1800 struct cds_lfht_iter iter
;
1801 struct lttng_channel_trigger_list
*trigger_list
;
1802 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1803 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1804 struct channel_info
*channel_info
;
1806 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1807 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1811 cds_lfht_lookup(state
->channel_triggers_ht
,
1812 hash_channel_key(&key
),
1813 match_channel_trigger_list
,
1816 node
= cds_lfht_iter_get_node(&iter
);
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1828 channel_triggers_ht_node
);
1829 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1830 &trigger_list
->list
, node
) {
1831 cds_list_del(&trigger_list_element
->node
);
1832 free(trigger_list_element
);
1834 cds_lfht_del(state
->channel_triggers_ht
, node
);
1835 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state
->channel_state_ht
,
1839 hash_channel_key(&key
),
1840 match_channel_state_sample
,
1843 node
= cds_lfht_iter_get_node(&iter
);
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1849 struct channel_state_sample
*sample
= caa_container_of(node
,
1850 struct channel_state_sample
,
1851 channel_state_ht_node
);
1853 cds_lfht_del(state
->channel_state_ht
, node
);
1854 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state
->channels_ht
,
1859 hash_channel_key(&key
),
1863 node
= cds_lfht_iter_get_node(&iter
);
1865 channel_info
= caa_container_of(node
, struct channel_info
,
1867 cds_lfht_del(state
->channels_ht
, node
);
1868 channel_info_destroy(channel_info
);
1871 *cmd_result
= LTTNG_OK
;
1876 int handle_notification_thread_command_session_rotation(
1877 struct notification_thread_state
*state
,
1878 enum notification_thread_command_type cmd_type
,
1879 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1880 uint64_t trace_archive_chunk_id
,
1881 struct lttng_trace_archive_location
*location
,
1882 enum lttng_error_code
*_cmd_result
)
1885 enum lttng_error_code cmd_result
= LTTNG_OK
;
1886 struct lttng_session_trigger_list
*trigger_list
;
1887 struct lttng_trigger_list_element
*trigger_list_element
;
1888 struct session_info
*session_info
;
1892 session_info
= find_or_create_session_info(state
, session_name
,
1893 session_uid
, session_gid
);
1894 if (!session_info
) {
1895 /* Allocation error or an internal error occurred. */
1897 cmd_result
= LTTNG_ERR_NOMEM
;
1901 session_info
->rotation
.ongoing
=
1902 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1903 session_info
->rotation
.id
= trace_archive_chunk_id
;
1904 trigger_list
= get_session_trigger_list(state
, session_name
);
1905 if (!trigger_list
) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1911 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1913 const struct lttng_condition
*condition
;
1914 const struct lttng_action
*action
;
1915 const struct lttng_trigger
*trigger
;
1916 struct notification_client_list
*client_list
;
1917 struct lttng_evaluation
*evaluation
= NULL
;
1918 enum lttng_condition_type condition_type
;
1919 bool client_list_is_empty
;
1921 trigger
= trigger_list_element
->trigger
;
1922 condition
= lttng_trigger_get_const_condition(trigger
);
1924 condition_type
= lttng_condition_get_type(condition
);
1926 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1927 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1929 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1930 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1934 action
= lttng_trigger_get_const_action(trigger
);
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action
) ==
1938 LTTNG_ACTION_TYPE_NOTIFY
);
1940 client_list
= get_client_list_from_condition(state
, condition
);
1941 assert(client_list
);
1943 pthread_mutex_lock(&client_list
->lock
);
1944 client_list_is_empty
= cds_list_empty(&client_list
->list
);
1945 pthread_mutex_unlock(&client_list
->lock
);
1946 if (client_list_is_empty
) {
1948 * No clients interested in the evaluation's result,
1954 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1955 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id
);
1958 evaluation
= lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id
, location
);
1963 /* Internal error */
1965 cmd_result
= LTTNG_ERR_UNK
;
1969 /* Dispatch evaluation result to all clients. */
1970 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1971 evaluation
, client_list
, state
,
1974 lttng_evaluation_destroy(evaluation
);
1976 notification_client_list_put(client_list
);
1977 if (caa_unlikely(ret
)) {
1982 session_info_put(session_info
);
1983 *_cmd_result
= cmd_result
;
1989 int condition_is_supported(struct lttng_condition
*condition
)
1993 switch (lttng_condition_get_type(condition
)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1997 enum lttng_domain_type domain
;
1999 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
2006 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2016 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
2026 /* Must be called with RCU read lock held. */
2028 int bind_trigger_to_matching_session(const struct lttng_trigger
*trigger
,
2029 struct notification_thread_state
*state
)
2032 const struct lttng_condition
*condition
;
2033 const char *session_name
;
2034 struct lttng_session_trigger_list
*trigger_list
;
2036 condition
= lttng_trigger_get_const_condition(trigger
);
2037 switch (lttng_condition_get_type(condition
)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
2041 enum lttng_condition_status status
;
2043 status
= lttng_condition_session_rotation_get_session_name(
2044 condition
, &session_name
);
2045 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2057 trigger_list
= get_session_trigger_list(state
, session_name
);
2058 if (!trigger_list
) {
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2067 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
2072 /* Must be called with RCU read lock held. */
2074 int bind_trigger_to_matching_channels(const struct lttng_trigger
*trigger
,
2075 struct notification_thread_state
*state
)
2078 struct cds_lfht_node
*node
;
2079 struct cds_lfht_iter iter
;
2080 struct channel_info
*channel
;
2082 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
2084 struct lttng_trigger_list_element
*trigger_list_element
;
2085 struct lttng_channel_trigger_list
*trigger_list
;
2086 struct cds_lfht_iter lookup_iter
;
2088 if (!trigger_applies_to_channel(trigger
, channel
)) {
2092 cds_lfht_lookup(state
->channel_triggers_ht
,
2093 hash_channel_key(&channel
->key
),
2094 match_channel_trigger_list
,
2097 node
= cds_lfht_iter_get_node(&lookup_iter
);
2099 trigger_list
= caa_container_of(node
,
2100 struct lttng_channel_trigger_list
,
2101 channel_triggers_ht_node
);
2103 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2104 if (!trigger_list_element
) {
2108 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2109 trigger_list_element
->trigger
= trigger
;
2110 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2122 * The effects of this are benign since:
2123 * - The client will succeed in registering the trigger, as it is valid,
2124 * - The trigger will, internally, be bound to the channel/session,
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
2128 * If this function returns a non-zero value, it means something is
2129 * fundamentally broken and the whole subsystem/thread will be torn down.
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2135 int handle_notification_thread_command_register_trigger(
2136 struct notification_thread_state
*state
,
2137 struct lttng_trigger
*trigger
,
2138 enum lttng_error_code
*cmd_result
)
2141 struct lttng_condition
*condition
;
2142 struct notification_client
*client
;
2143 struct notification_client_list
*client_list
= NULL
;
2144 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2145 struct notification_client_list_element
*client_list_element
, *tmp
;
2146 struct cds_lfht_node
*node
;
2147 struct cds_lfht_iter iter
;
2148 bool free_trigger
= true;
2152 condition
= lttng_trigger_get_condition(trigger
);
2155 ret
= condition_is_supported(condition
);
2158 } else if (ret
== 0) {
2159 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2162 /* Feature is supported, continue. */
2166 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2167 if (!trigger_ht_element
) {
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element
->node
);
2174 trigger_ht_element
->trigger
= trigger
;
2176 node
= cds_lfht_add_unique(state
->triggers_ht
,
2177 lttng_condition_hash(condition
),
2180 &trigger_ht_element
->node
);
2181 if (node
!= &trigger_ht_element
->node
) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2184 goto error_free_ht_element
;
2188 * Ownership of the trigger and of its wrapper was transfered to
2191 trigger_ht_element
= NULL
;
2192 free_trigger
= false;
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2199 client_list
= notification_client_list_create(trigger
);
2202 goto error_free_ht_element
;
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2207 client_socket_ht_node
) {
2208 if (!trigger_applies_to_client(trigger
, client
)) {
2212 client_list_element
= zmalloc(sizeof(*client_list_element
));
2213 if (!client_list_element
) {
2215 goto error_put_client_list
;
2217 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2218 client_list_element
->client
= client
;
2219 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2222 switch (get_condition_binding_object(condition
)) {
2223 case LTTNG_OBJECT_TYPE_SESSION
:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret
= bind_trigger_to_matching_session(trigger
, state
);
2227 goto error_put_client_list
;
2230 case LTTNG_OBJECT_TYPE_CHANNEL
:
2232 * Add the trigger to list of triggers bound to the channels
2235 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2237 goto error_put_client_list
;
2240 case LTTNG_OBJECT_TYPE_NONE
:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2245 goto error_put_client_list
;
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
2274 * No need to lock the list here as it has not been published yet.
2276 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2277 &client_list
->list
, node
) {
2278 ret
= evaluate_condition_for_client(trigger
, condition
,
2279 client_list_element
->client
, state
);
2281 goto error_put_client_list
;
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2289 publish_notification_client_list(state
, client_list
);
2292 *cmd_result
= LTTNG_OK
;
2294 error_put_client_list
:
2295 notification_client_list_put(client_list
);
2297 error_free_ht_element
:
2298 free(trigger_ht_element
);
2301 lttng_trigger_destroy(trigger
);
2308 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2310 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2315 int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state
*state
,
2317 struct lttng_trigger
*trigger
,
2318 enum lttng_error_code
*_cmd_reply
)
2320 struct cds_lfht_iter iter
;
2321 struct cds_lfht_node
*triggers_ht_node
;
2322 struct lttng_channel_trigger_list
*trigger_list
;
2323 struct notification_client_list
*client_list
;
2324 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2325 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2327 enum lttng_error_code cmd_reply
;
2331 cds_lfht_lookup(state
->triggers_ht
,
2332 lttng_condition_hash(condition
),
2336 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2337 if (!triggers_ht_node
) {
2338 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2341 cmd_reply
= LTTNG_OK
;
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2346 channel_triggers_ht_node
) {
2347 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2349 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2350 &trigger_list
->list
, node
) {
2351 const struct lttng_condition
*current_condition
=
2352 lttng_trigger_get_const_condition(
2353 trigger_element
->trigger
);
2355 assert(current_condition
);
2356 if (!lttng_condition_is_equal(condition
,
2357 current_condition
)) {
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element
->node
);
2363 /* A trigger can only appear once per channel */
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2372 client_list
= get_client_list_from_condition(state
, condition
);
2373 assert(client_list
);
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list
);
2377 notification_client_list_put(client_list
);
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2382 struct lttng_trigger_ht_element
, node
);
2383 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2385 /* Release the ownership of the trigger. */
2386 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2387 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2391 *_cmd_reply
= cmd_reply
;
2396 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2397 int handle_notification_thread_command(
2398 struct notification_thread_handle
*handle
,
2399 struct notification_thread_state
*state
)
2403 struct notification_thread_command
*cmd
;
2405 /* Read the event pipe to put it back into a quiescent state. */
2406 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2408 if (ret
!= sizeof(counter
)) {
2412 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2413 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2414 struct notification_thread_command
, cmd_list_node
);
2415 cds_list_del(&cmd
->cmd_list_node
);
2416 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2417 switch (cmd
->type
) {
2418 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2419 DBG("[notification-thread] Received register trigger command");
2420 ret
= handle_notification_thread_command_register_trigger(
2421 state
, cmd
->parameters
.trigger
,
2424 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2425 DBG("[notification-thread] Received unregister trigger command");
2426 ret
= handle_notification_thread_command_unregister_trigger(
2427 state
, cmd
->parameters
.trigger
,
2430 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2431 DBG("[notification-thread] Received add channel command");
2432 ret
= handle_notification_thread_command_add_channel(
2434 cmd
->parameters
.add_channel
.session
.name
,
2435 cmd
->parameters
.add_channel
.session
.uid
,
2436 cmd
->parameters
.add_channel
.session
.gid
,
2437 cmd
->parameters
.add_channel
.channel
.name
,
2438 cmd
->parameters
.add_channel
.channel
.domain
,
2439 cmd
->parameters
.add_channel
.channel
.key
,
2440 cmd
->parameters
.add_channel
.channel
.capacity
,
2443 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2444 DBG("[notification-thread] Received remove channel command");
2445 ret
= handle_notification_thread_command_remove_channel(
2446 state
, cmd
->parameters
.remove_channel
.key
,
2447 cmd
->parameters
.remove_channel
.domain
,
2450 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2451 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2452 DBG("[notification-thread] Received session rotation %s command",
2453 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2454 "ongoing" : "completed");
2455 ret
= handle_notification_thread_command_session_rotation(
2458 cmd
->parameters
.session_rotation
.session_name
,
2459 cmd
->parameters
.session_rotation
.uid
,
2460 cmd
->parameters
.session_rotation
.gid
,
2461 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2462 cmd
->parameters
.session_rotation
.location
,
2465 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2466 DBG("[notification-thread] Received quit command");
2467 cmd
->reply_code
= LTTNG_OK
;
2471 ERR("[notification-thread] Unknown internal command received");
2479 if (cmd
->is_async
) {
2483 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2487 /* Wake-up and return a fatal error to the calling thread. */
2488 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2489 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2491 /* Indicate a fatal error to the caller. */
2496 int socket_set_non_blocking(int socket
)
2500 /* Set the pipe as non-blocking. */
2501 ret
= fcntl(socket
, F_GETFL
, 0);
2503 PERROR("fcntl get socket flags");
2508 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2510 PERROR("fcntl set O_NONBLOCK socket flag");
2513 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2518 /* Client lock must be acquired by caller. */
2520 int client_reset_inbound_state(struct notification_client
*client
)
2524 ASSERT_LOCKED(client
->lock
);
2526 ret
= lttng_dynamic_buffer_set_size(
2527 &client
->communication
.inbound
.buffer
, 0);
2530 client
->communication
.inbound
.bytes_to_receive
=
2531 sizeof(struct lttng_notification_channel_message
);
2532 client
->communication
.inbound
.msg_type
=
2533 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2534 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2535 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2536 ret
= lttng_dynamic_buffer_set_size(
2537 &client
->communication
.inbound
.buffer
,
2538 client
->communication
.inbound
.bytes_to_receive
);
2542 int handle_notification_thread_client_connect(
2543 struct notification_thread_state
*state
)
2546 struct notification_client
*client
;
2548 DBG("[notification-thread] Handling new notification channel client connection");
2550 client
= zmalloc(sizeof(*client
));
2556 pthread_mutex_init(&client
->lock
, NULL
);
2557 client
->id
= state
->next_notification_client_id
++;
2558 CDS_INIT_LIST_HEAD(&client
->condition_list
);
2559 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
2560 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
2561 client
->communication
.inbound
.expect_creds
= true;
2563 pthread_mutex_lock(&client
->lock
);
2564 ret
= client_reset_inbound_state(client
);
2565 pthread_mutex_unlock(&client
->lock
);
2567 ERR("[notification-thread] Failed to reset client communication's inbound state");
2572 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
2574 ERR("[notification-thread] Failed to accept new notification channel client connection");
2579 client
->socket
= ret
;
2581 ret
= socket_set_non_blocking(client
->socket
);
2583 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2587 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
2589 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2594 ret
= lttng_poll_add(&state
->events
, client
->socket
,
2595 LPOLLIN
| LPOLLERR
|
2596 LPOLLHUP
| LPOLLRDHUP
);
2598 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2602 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2606 cds_lfht_add(state
->client_socket_ht
,
2607 hash_client_socket(client
->socket
),
2608 &client
->client_socket_ht_node
);
2609 cds_lfht_add(state
->client_id_ht
,
2610 hash_client_id(client
->id
),
2611 &client
->client_id_ht_node
);
2616 notification_client_destroy(client
, state
);
2620 /* RCU read-lock must be held by the caller. */
2621 /* Client lock must be held by the caller */
2623 int notification_thread_client_disconnect(
2624 struct notification_client
*client
,
2625 struct notification_thread_state
*state
)
2628 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
2630 /* Acquire the client lock to disable its communication atomically. */
2631 client
->communication
.active
= false;
2632 ret
= lttng_poll_del(&state
->events
, client
->socket
);
2634 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2638 cds_lfht_del(state
->client_socket_ht
, &client
->client_socket_ht_node
);
2639 cds_lfht_del(state
->client_id_ht
, &client
->client_id_ht_node
);
2641 /* Release all conditions to which the client was subscribed. */
2642 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
2643 &client
->condition_list
, node
) {
2644 (void) notification_thread_client_unsubscribe(client
,
2645 condition_list_element
->condition
, state
, NULL
);
2649 * Client no longer accessible to other threads (through the
2652 notification_client_destroy(client
, state
);
2656 int handle_notification_thread_client_disconnect(
2657 int client_socket
, struct notification_thread_state
*state
)
2660 struct notification_client
*client
;
2663 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2665 client
= get_client_from_socket(client_socket
, state
);
2667 /* Internal state corruption, fatal error. */
2668 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2674 pthread_mutex_lock(&client
->lock
);
2675 ret
= notification_thread_client_disconnect(client
, state
);
2676 pthread_mutex_unlock(&client
->lock
);
2682 int handle_notification_thread_client_disconnect_all(
2683 struct notification_thread_state
*state
)
2685 struct cds_lfht_iter iter
;
2686 struct notification_client
*client
;
2687 bool error_encoutered
= false;
2690 DBG("[notification-thread] Closing all client connections");
2691 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2692 client_socket_ht_node
) {
2695 pthread_mutex_lock(&client
->lock
);
2696 ret
= notification_thread_client_disconnect(
2698 pthread_mutex_unlock(&client
->lock
);
2700 error_encoutered
= true;
2704 return error_encoutered
? 1 : 0;
2707 int handle_notification_thread_trigger_unregister_all(
2708 struct notification_thread_state
*state
)
2710 bool error_occurred
= false;
2711 struct cds_lfht_iter iter
;
2712 struct lttng_trigger_ht_element
*trigger_ht_element
;
2715 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
2717 int ret
= handle_notification_thread_command_unregister_trigger(
2718 state
, trigger_ht_element
->trigger
, NULL
);
2720 error_occurred
= true;
2724 return error_occurred
? -1 : 0;
2728 int client_handle_transmission_status(
2729 struct notification_client
*client
,
2730 enum client_transmission_status transmission_status
,
2731 struct notification_thread_state
*state
)
2735 switch (transmission_status
) {
2736 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
2737 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2738 CLIENT_POLL_MASK_IN
);
2743 client
->communication
.outbound
.queued_command_reply
= false;
2744 client
->communication
.outbound
.dropped_notification
= false;
2746 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
2748 * We want to be notified whenever there is buffer space
2749 * available to send the rest of the payload.
2751 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2752 CLIENT_POLL_MASK_IN_OUT
);
2757 case CLIENT_TRANSMISSION_STATUS_FAIL
:
2758 ret
= notification_thread_client_disconnect(client
, state
);
2763 case CLIENT_TRANSMISSION_STATUS_ERROR
:
2773 /* Client lock must be acquired by caller. */
2775 enum client_transmission_status
client_flush_outgoing_queue(
2776 struct notification_client
*client
,
2777 struct notification_thread_state
*state
)
2780 size_t to_send_count
;
2781 enum client_transmission_status status
;
2783 ASSERT_LOCKED(client
->lock
);
2785 assert(client
->communication
.outbound
.buffer
.size
!= 0);
2786 to_send_count
= client
->communication
.outbound
.buffer
.size
;
2787 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2790 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
2791 client
->communication
.outbound
.buffer
.data
,
2793 if ((ret
>= 0 && ret
< to_send_count
)) {
2794 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2796 to_send_count
-= max(ret
, 0);
2798 memcpy(client
->communication
.outbound
.buffer
.data
,
2799 client
->communication
.outbound
.buffer
.data
+
2800 client
->communication
.outbound
.buffer
.size
- to_send_count
,
2802 ret
= lttng_dynamic_buffer_set_size(
2803 &client
->communication
.outbound
.buffer
,
2806 status
= CLIENT_TRANSMISSION_STATUS_ERROR
;
2809 status
= CLIENT_TRANSMISSION_STATUS_QUEUED
;
2810 } else if (ret
< 0) {
2811 /* Generic error, disconnect the client. */
2812 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2814 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
2816 /* No error and flushed the queue completely. */
2817 ret
= lttng_dynamic_buffer_set_size(
2818 &client
->communication
.outbound
.buffer
, 0);
2820 status
= CLIENT_TRANSMISSION_STATUS_ERROR
;
2823 status
= CLIENT_TRANSMISSION_STATUS_COMPLETE
;
2826 ret
= client_handle_transmission_status(client
, status
, state
);
2836 /* Client lock must be acquired by caller. */
2838 int client_send_command_reply(struct notification_client
*client
,
2839 struct notification_thread_state
*state
,
2840 enum lttng_notification_channel_status status
)
2843 struct lttng_notification_channel_command_reply reply
= {
2844 .status
= (int8_t) status
,
2846 struct lttng_notification_channel_message msg
= {
2847 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
2848 .size
= sizeof(reply
),
2850 char buffer
[sizeof(msg
) + sizeof(reply
)];
2852 ASSERT_LOCKED(client
->lock
);
2854 if (client
->communication
.outbound
.queued_command_reply
) {
2855 /* Protocol error. */
2859 memcpy(buffer
, &msg
, sizeof(msg
));
2860 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
2861 DBG("[notification-thread] Send command reply (%i)", (int) status
);
2863 /* Enqueue buffer to outgoing queue and flush it. */
2864 ret
= lttng_dynamic_buffer_append(
2865 &client
->communication
.outbound
.buffer
,
2866 buffer
, sizeof(buffer
));
2871 ret
= client_flush_outgoing_queue(client
, state
);
2876 if (client
->communication
.outbound
.buffer
.size
!= 0) {
2877 /* Queue could not be emptied. */
2878 client
->communication
.outbound
.queued_command_reply
= true;
2887 int client_handle_message_unknown(struct notification_client
*client
,
2888 struct notification_thread_state
*state
)
2892 pthread_mutex_lock(&client
->lock
);
2895 * Receiving message header. The function will be called again
2896 * once the rest of the message as been received and can be
2899 const struct lttng_notification_channel_message
*msg
;
2901 assert(sizeof(*msg
) == client
->communication
.inbound
.buffer
.size
);
2902 msg
= (const struct lttng_notification_channel_message
*)
2903 client
->communication
.inbound
.buffer
.data
;
2905 if (msg
->size
== 0 ||
2906 msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
2907 ERR("[notification-thread] Invalid notification channel message: length = %u",
2913 switch (msg
->type
) {
2914 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2915 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2916 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2920 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2924 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
2925 client
->communication
.inbound
.msg_type
=
2926 (enum lttng_notification_channel_message_type
) msg
->type
;
2927 ret
= lttng_dynamic_buffer_set_size(
2928 &client
->communication
.inbound
.buffer
, msg
->size
);
2930 pthread_mutex_unlock(&client
->lock
);
2935 int client_handle_message_handshake(struct notification_client
*client
,
2936 struct notification_thread_state
*state
)
2939 struct lttng_notification_channel_command_handshake
*handshake_client
;
2940 const struct lttng_notification_channel_command_handshake handshake_reply
= {
2941 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
2942 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
2944 const struct lttng_notification_channel_message msg_header
= {
2945 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
2946 .size
= sizeof(handshake_reply
),
2948 enum lttng_notification_channel_status status
=
2949 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2950 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
2952 pthread_mutex_lock(&client
->lock
);
2954 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
2955 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
2956 sizeof(handshake_reply
));
2959 (struct lttng_notification_channel_command_handshake
*)
2960 client
->communication
.inbound
.buffer
2962 client
->major
= handshake_client
->major
;
2963 client
->minor
= handshake_client
->minor
;
2964 if (!client
->communication
.inbound
.creds_received
) {
2965 ERR("[notification-thread] No credentials received from client");
2970 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
2971 &client
->communication
.inbound
.creds
);
2972 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
2973 &client
->communication
.inbound
.creds
);
2974 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2975 client
->uid
, client
->gid
, (int) client
->major
,
2976 (int) client
->minor
);
2978 if (handshake_client
->major
!=
2979 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
2980 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
2983 ret
= lttng_dynamic_buffer_append(
2984 &client
->communication
.outbound
.buffer
, send_buffer
,
2985 sizeof(send_buffer
));
2987 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2991 client
->validated
= true;
2992 client
->communication
.active
= true;
2994 ret
= client_flush_outgoing_queue(client
, state
);
2999 ret
= client_send_command_reply(client
, state
, status
);
3001 ERR("[notification-thread] Failed to send reply to notification channel client");
3005 /* Set reception state to receive the next message header. */
3006 ret
= client_reset_inbound_state(client
);
3008 ERR("[notification-thread] Failed to reset client communication's inbound state");
3013 pthread_mutex_unlock(&client
->lock
);
3018 int client_handle_message_subscription(
3019 struct notification_client
*client
,
3020 enum lttng_notification_channel_message_type msg_type
,
3021 struct notification_thread_state
*state
)
3024 struct lttng_condition
*condition
;
3025 enum lttng_notification_channel_status status
=
3026 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3027 struct lttng_payload_view condition_view
=
3028 lttng_payload_view_from_dynamic_buffer(
3029 &client
->communication
.inbound
.buffer
,
3031 size_t expected_condition_size
;
3033 pthread_mutex_lock(&client
->lock
);
3034 expected_condition_size
= client
->communication
.inbound
.buffer
.size
;
3035 pthread_mutex_unlock(&client
->lock
);
3037 ret
= lttng_condition_create_from_payload(&condition_view
, &condition
);
3038 if (ret
!= expected_condition_size
) {
3039 ERR("[notification-thread] Malformed condition received from client");
3043 if (msg_type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
3044 ret
= notification_thread_client_subscribe(
3045 client
, condition
, state
, &status
);
3047 ret
= notification_thread_client_unsubscribe(
3048 client
, condition
, state
, &status
);
3054 pthread_mutex_lock(&client
->lock
);
3055 ret
= client_send_command_reply(client
, state
, status
);
3057 ERR("[notification-thread] Failed to send reply to notification channel client");
3061 /* Set reception state to receive the next message header. */
3062 ret
= client_reset_inbound_state(client
);
3064 ERR("[notification-thread] Failed to reset client communication's inbound state");
3069 pthread_mutex_unlock(&client
->lock
);
3075 int client_dispatch_message(struct notification_client
*client
,
3076 struct notification_thread_state
*state
)
3080 if (client
->communication
.inbound
.msg_type
!=
3081 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
3082 client
->communication
.inbound
.msg_type
!=
3083 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
3084 !client
->validated
) {
3085 WARN("[notification-thread] client attempted a command before handshake");
3090 switch (client
->communication
.inbound
.msg_type
) {
3091 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
3093 ret
= client_handle_message_unknown(client
, state
);
3096 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3098 ret
= client_handle_message_handshake(client
, state
);
3101 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3102 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3104 ret
= client_handle_message_subscription(client
,
3105 client
->communication
.inbound
.msg_type
, state
);
3115 /* Incoming data from client. */
3116 int handle_notification_thread_client_in(
3117 struct notification_thread_state
*state
, int socket
)
3120 struct notification_client
*client
;
3123 bool message_is_complete
= false;
3125 client
= get_client_from_socket(socket
, state
);
3127 /* Internal error, abort. */
3132 pthread_mutex_lock(&client
->lock
);
3133 offset
= client
->communication
.inbound
.buffer
.size
-
3134 client
->communication
.inbound
.bytes_to_receive
;
3135 if (client
->communication
.inbound
.expect_creds
) {
3136 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
3137 client
->communication
.inbound
.buffer
.data
+ offset
,
3138 client
->communication
.inbound
.bytes_to_receive
,
3139 &client
->communication
.inbound
.creds
);
3141 client
->communication
.inbound
.expect_creds
= false;
3142 client
->communication
.inbound
.creds_received
= true;
3145 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
3146 client
->communication
.inbound
.buffer
.data
+ offset
,
3147 client
->communication
.inbound
.bytes_to_receive
);
3149 if (recv_ret
>= 0) {
3150 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
3151 message_is_complete
= client
->communication
.inbound
3152 .bytes_to_receive
== 0;
3154 pthread_mutex_unlock(&client
->lock
);
3156 goto error_disconnect_client
;
3159 if (message_is_complete
) {
3160 ret
= client_dispatch_message(client
, state
);
3163 * Only returns an error if this client must be
3166 goto error_disconnect_client
;
3171 error_disconnect_client
:
3172 pthread_mutex_lock(&client
->lock
);
3173 ret
= notification_thread_client_disconnect(client
, state
);
3174 pthread_mutex_unlock(&client
->lock
);
3178 /* Client ready to receive outgoing data. */
3179 int handle_notification_thread_client_out(
3180 struct notification_thread_state
*state
, int socket
)
3183 struct notification_client
*client
;
3185 client
= get_client_from_socket(socket
, state
);
3187 /* Internal error, abort. */
3192 pthread_mutex_lock(&client
->lock
);
3193 ret
= client_flush_outgoing_queue(client
, state
);
3194 pthread_mutex_unlock(&client
->lock
);
3203 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
3204 const struct channel_state_sample
*sample
,
3205 uint64_t buffer_capacity
)
3207 bool result
= false;
3209 enum lttng_condition_type condition_type
;
3210 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
3211 condition
, struct lttng_condition_buffer_usage
,
3214 if (use_condition
->threshold_bytes
.set
) {
3215 threshold
= use_condition
->threshold_bytes
.value
;
3218 * Threshold was expressed as a ratio.
3220 * TODO the threshold (in bytes) of conditions expressed
3221 * as a ratio of total buffer size could be cached to
3222 * forego this double-multiplication or it could be performed
3223 * as fixed-point math.
3225 * Note that caching should accommodates the case where the
3226 * condition applies to multiple channels (i.e. don't assume
3227 * that all channels matching my_chann* have the same size...)
3229 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3230 (double) buffer_capacity
);
3233 condition_type
= lttng_condition_get_type(condition
);
3234 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3235 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3236 threshold
, sample
->highest_usage
);
3239 * The low condition should only be triggered once _all_ of the
3240 * streams in a channel have gone below the "low" threshold.
3242 if (sample
->highest_usage
<= threshold
) {
3246 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3247 threshold
, sample
->highest_usage
);
3250 * For high buffer usage scenarios, we want to trigger whenever
3251 * _any_ of the streams has reached the "high" threshold.
3253 if (sample
->highest_usage
>= threshold
) {
3262 bool evaluate_session_consumed_size_condition(
3263 const struct lttng_condition
*condition
,
3264 uint64_t session_consumed_size
)
3267 const struct lttng_condition_session_consumed_size
*size_condition
=
3268 container_of(condition
,
3269 struct lttng_condition_session_consumed_size
,
3272 threshold
= size_condition
->consumed_threshold_bytes
.value
;
3273 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
3274 threshold
, session_consumed_size
);
3275 return session_consumed_size
>= threshold
;
3279 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
3280 struct lttng_evaluation
**evaluation
,
3281 const struct notification_thread_state
*state
,
3282 const struct channel_state_sample
*previous_sample
,
3283 const struct channel_state_sample
*latest_sample
,
3284 uint64_t previous_session_consumed_total
,
3285 uint64_t latest_session_consumed_total
,
3286 struct channel_info
*channel_info
)
3289 enum lttng_condition_type condition_type
;
3290 const bool previous_sample_available
= !!previous_sample
;
3291 bool previous_sample_result
= false;
3292 bool latest_sample_result
;
3294 condition_type
= lttng_condition_get_type(condition
);
3296 switch (condition_type
) {
3297 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3299 if (caa_likely(previous_sample_available
)) {
3300 previous_sample_result
=
3301 evaluate_buffer_usage_condition(condition
,
3302 previous_sample
, channel_info
->capacity
);
3304 latest_sample_result
= evaluate_buffer_usage_condition(
3305 condition
, latest_sample
,
3306 channel_info
->capacity
);
3308 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3309 if (caa_likely(previous_sample_available
)) {
3310 previous_sample_result
=
3311 evaluate_session_consumed_size_condition(
3313 previous_session_consumed_total
);
3315 latest_sample_result
=
3316 evaluate_session_consumed_size_condition(
3318 latest_session_consumed_total
);
3321 /* Unknown condition type; internal error. */
3325 if (!latest_sample_result
||
3326 (previous_sample_result
== latest_sample_result
)) {
3328 * Only trigger on a condition evaluation transition.
3330 * NOTE: This edge-triggered logic may not be appropriate for
3331 * future condition types.
3336 if (!evaluation
|| !latest_sample_result
) {
3340 switch (condition_type
) {
3341 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3342 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3343 *evaluation
= lttng_evaluation_buffer_usage_create(
3345 latest_sample
->highest_usage
,
3346 channel_info
->capacity
);
3348 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3349 *evaluation
= lttng_evaluation_session_consumed_size_create(
3350 latest_session_consumed_total
);
3365 int client_enqueue_dropped_notification(struct notification_client
*client
)
3368 struct lttng_notification_channel_message msg
= {
3369 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
3373 ASSERT_LOCKED(client
->lock
);
3375 ret
= lttng_dynamic_buffer_append(
3376 &client
->communication
.outbound
.buffer
, &msg
,
3382 * Permission checks relative to notification channel clients are performed
3383 * here. Notice how object, client, and trigger credentials are involved in
3386 * The `object` credentials are the credentials associated with the "subject"
3387 * of a condition. For instance, a `rotation completed` condition applies
3388 * to a session. When that condition is met, it will produce an evaluation
3389 * against a session. Hence, in this case, the `object` credentials are the
3390 * credentials of the "subject" session.
3392 * The `trigger` credentials are the credentials of the user that registered the
3395 * The `client` credentials are the credentials of the user that created a given
3396 * notification channel.
3398 * In terms of visibility, it is expected that non-privilieged users can only
3399 * register triggers against "their" objects (their own sessions and
3400 * applications they are allowed to interact with). They can then open a
3401 * notification channel and subscribe to notifications associated with those
3404 * As for privilieged users, they can register triggers against the objects of
3405 * other users. They can then subscribe to the notifications associated to their
3406 * triggers. Privilieged users _can't_ subscribe to the notifications of
3407 * triggers owned by other users; they must create their own triggers.
3409 * This is more a concern of usability than security. It would be difficult for
3410 * a root user reliably subscribe to a specific set of conditions without
3411 * interference from external users (those could, for instance, unregister
3415 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
3416 const struct lttng_evaluation
*evaluation
,
3417 struct notification_client_list
* client_list
,
3418 struct notification_thread_state
*state
,
3419 uid_t object_uid
, gid_t object_gid
)
3422 struct lttng_payload msg_payload
;
3423 struct notification_client_list_element
*client_list_element
, *tmp
;
3424 const struct lttng_notification notification
= {
3425 .condition
= (struct lttng_condition
*) lttng_trigger_get_const_condition(trigger
),
3426 .evaluation
= (struct lttng_evaluation
*) evaluation
,
3428 struct lttng_notification_channel_message msg_header
= {
3429 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
3431 const struct lttng_credentials
*trigger_creds
= lttng_trigger_get_credentials(trigger
);
3433 lttng_payload_init(&msg_payload
);
3435 ret
= lttng_dynamic_buffer_append(&msg_payload
.buffer
, &msg_header
,
3436 sizeof(msg_header
));
3441 ret
= lttng_notification_serialize(¬ification
, &msg_payload
);
3443 ERR("[notification-thread] Failed to serialize notification");
3448 /* Update payload size. */
3449 ((struct lttng_notification_channel_message
*) msg_payload
.buffer
.data
)
3450 ->size
= (uint32_t)(
3451 msg_payload
.buffer
.size
- sizeof(msg_header
));
3453 pthread_mutex_lock(&client_list
->lock
);
3454 cds_list_for_each_entry_safe(client_list_element
, tmp
,
3455 &client_list
->list
, node
) {
3456 struct notification_client
*client
=
3457 client_list_element
->client
;
3460 pthread_mutex_lock(&client
->lock
);
3461 if (client
->uid
!= object_uid
&& client
->gid
!= object_gid
&&
3463 /* Client is not allowed to monitor this channel. */
3464 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3468 if (client
->uid
!= trigger_creds
->uid
&& client
->gid
!= trigger_creds
->gid
) {
3469 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3473 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3474 client
->socket
, msg_payload
.buffer
.size
);
3475 if (client
->communication
.outbound
.buffer
.size
) {
3477 * Outgoing data is already buffered for this client;
3478 * drop the notification and enqueue a "dropped
3479 * notification" message if this is the first dropped
3480 * notification since the socket spilled-over to the
3483 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3485 if (!client
->communication
.outbound
.dropped_notification
) {
3486 client
->communication
.outbound
.dropped_notification
= true;
3487 ret
= client_enqueue_dropped_notification(
3496 ret
= lttng_dynamic_buffer_append_buffer(
3497 &client
->communication
.outbound
.buffer
,
3498 &msg_payload
.buffer
);
3503 ret
= client_flush_outgoing_queue(client
, state
);
3508 pthread_mutex_unlock(&client
->lock
);
3510 goto end_unlock_list
;
3516 pthread_mutex_unlock(&client_list
->lock
);
3518 lttng_payload_reset(&msg_payload
);
3522 int handle_notification_thread_channel_sample(
3523 struct notification_thread_state
*state
, int pipe
,
3524 enum lttng_domain_type domain
)
3527 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
3528 struct channel_info
*channel_info
;
3529 struct cds_lfht_node
*node
;
3530 struct cds_lfht_iter iter
;
3531 struct lttng_channel_trigger_list
*trigger_list
;
3532 struct lttng_trigger_list_element
*trigger_list_element
;
3533 bool previous_sample_available
= false;
3534 struct channel_state_sample previous_sample
, latest_sample
;
3535 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
3538 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3539 * ensuring that read/write of sampling messages are atomic.
3541 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
3542 if (ret
!= sizeof(sample_msg
)) {
3543 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3550 latest_sample
.key
.key
= sample_msg
.key
;
3551 latest_sample
.key
.domain
= domain
;
3552 latest_sample
.highest_usage
= sample_msg
.highest
;
3553 latest_sample
.lowest_usage
= sample_msg
.lowest
;
3554 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
3558 /* Retrieve the channel's informations */
3559 cds_lfht_lookup(state
->channels_ht
,
3560 hash_channel_key(&latest_sample
.key
),
3564 node
= cds_lfht_iter_get_node(&iter
);
3565 if (caa_unlikely(!node
)) {
3567 * Not an error since the consumer can push a sample to the pipe
3568 * and the rest of the session daemon could notify us of the
3569 * channel's destruction before we get a chance to process that
3572 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
3573 latest_sample
.key
.key
,
3574 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
3578 channel_info
= caa_container_of(node
, struct channel_info
,
3580 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
3582 latest_sample
.key
.key
,
3583 channel_info
->session_info
->name
,
3584 latest_sample
.highest_usage
,
3585 latest_sample
.lowest_usage
,
3586 latest_sample
.channel_total_consumed
);
3588 previous_session_consumed_total
=
3589 channel_info
->session_info
->consumed_data_size
;
3591 /* Retrieve the channel's last sample, if it exists, and update it. */
3592 cds_lfht_lookup(state
->channel_state_ht
,
3593 hash_channel_key(&latest_sample
.key
),
3594 match_channel_state_sample
,
3597 node
= cds_lfht_iter_get_node(&iter
);
3598 if (caa_likely(node
)) {
3599 struct channel_state_sample
*stored_sample
;
3601 /* Update the sample stored. */
3602 stored_sample
= caa_container_of(node
,
3603 struct channel_state_sample
,
3604 channel_state_ht_node
);
3606 memcpy(&previous_sample
, stored_sample
,
3607 sizeof(previous_sample
));
3608 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
3609 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
3610 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
3611 previous_sample_available
= true;
3613 latest_session_consumed_total
=
3614 previous_session_consumed_total
+
3615 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
3618 * This is the channel's first sample, allocate space for and
3619 * store the new sample.
3621 struct channel_state_sample
*stored_sample
;
3623 stored_sample
= zmalloc(sizeof(*stored_sample
));
3624 if (!stored_sample
) {
3629 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
3630 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
3631 cds_lfht_add(state
->channel_state_ht
,
3632 hash_channel_key(&stored_sample
->key
),
3633 &stored_sample
->channel_state_ht_node
);
3635 latest_session_consumed_total
=
3636 previous_session_consumed_total
+
3637 latest_sample
.channel_total_consumed
;
3640 channel_info
->session_info
->consumed_data_size
=
3641 latest_session_consumed_total
;
3643 /* Find triggers associated with this channel. */
3644 cds_lfht_lookup(state
->channel_triggers_ht
,
3645 hash_channel_key(&latest_sample
.key
),
3646 match_channel_trigger_list
,
3649 node
= cds_lfht_iter_get_node(&iter
);
3650 if (caa_likely(!node
)) {
3654 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
3655 channel_triggers_ht_node
);
3656 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
3658 const struct lttng_condition
*condition
;
3659 const struct lttng_action
*action
;
3660 const struct lttng_trigger
*trigger
;
3661 struct notification_client_list
*client_list
= NULL
;
3662 struct lttng_evaluation
*evaluation
= NULL
;
3663 bool client_list_is_empty
;
3666 trigger
= trigger_list_element
->trigger
;
3667 condition
= lttng_trigger_get_const_condition(trigger
);
3669 action
= lttng_trigger_get_const_action(trigger
);
3671 /* Notify actions are the only type currently supported. */
3672 assert(lttng_action_get_type_const(action
) ==
3673 LTTNG_ACTION_TYPE_NOTIFY
);
3676 * Check if any client is subscribed to the result of this
3679 client_list
= get_client_list_from_condition(state
, condition
);
3680 assert(client_list
);
3681 client_list_is_empty
= cds_list_empty(&client_list
->list
);
3682 if (client_list_is_empty
) {
3684 * No clients interested in the evaluation's result,
3690 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
3691 previous_sample_available
? &previous_sample
: NULL
,
3693 previous_session_consumed_total
,
3694 latest_session_consumed_total
,
3696 if (caa_unlikely(ret
)) {
3700 if (caa_likely(!evaluation
)) {
3704 /* Dispatch evaluation result to all clients. */
3705 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
3706 evaluation
, client_list
, state
,
3707 channel_info
->session_info
->uid
,
3708 channel_info
->session_info
->gid
);
3709 lttng_evaluation_destroy(evaluation
);
3711 notification_client_list_put(client_list
);
3712 if (caa_unlikely(ret
)) {