Fix: evaluate trigger condition on registration
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
1da26331 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
d53ea4e4 41#include <fcntl.h>
ab0ee2ca 42
1da26331
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
5332364f 91 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
14fa22f8
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
01ea340e 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
14fa22f8 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
e4db5ace
JR
157static unsigned long hash_channel_key(struct channel_key *key);
158static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164static
165int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
ab0ee2ca
JG
171static
172int match_client(struct cds_lfht_node *node, const void *key)
173{
174 /* This double-cast is intended to supress pointer-to-cast warning. */
175 int socket = (int) (intptr_t) key;
176 struct notification_client *client;
177
178 client = caa_container_of(node, struct notification_client,
179 client_socket_ht_node);
180
181 return !!(client->socket == socket);
182}
183
184static
185int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
186{
187 struct channel_key *channel_key = (struct channel_key *) key;
188 struct lttng_channel_trigger_list *trigger_list;
189
190 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
191 channel_triggers_ht_node);
192
193 return !!((channel_key->key == trigger_list->channel_key.key) &&
194 (channel_key->domain == trigger_list->channel_key.domain));
195}
196
197static
198int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
199{
200 struct channel_key *channel_key = (struct channel_key *) key;
201 struct channel_state_sample *sample;
202
203 sample = caa_container_of(node, struct channel_state_sample,
204 channel_state_ht_node);
205
206 return !!((channel_key->key == sample->key.key) &&
207 (channel_key->domain == sample->key.domain));
208}
209
210static
211int match_channel_info(struct cds_lfht_node *node, const void *key)
212{
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct channel_info *channel_info;
215
216 channel_info = caa_container_of(node, struct channel_info,
217 channels_ht_node);
218
219 return !!((channel_key->key == channel_info->key.key) &&
220 (channel_key->domain == channel_info->key.domain));
221}
222
223static
224int match_condition(struct cds_lfht_node *node, const void *key)
225{
226 struct lttng_condition *condition_key = (struct lttng_condition *) key;
227 struct lttng_trigger_ht_element *trigger;
228 struct lttng_condition *condition;
229
230 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
231 node);
232 condition = lttng_trigger_get_condition(trigger->trigger);
233 assert(condition);
234
235 return !!lttng_condition_is_equal(condition_key, condition);
236}
237
238static
239int match_client_list(struct cds_lfht_node *node, const void *key)
240{
241 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
242 struct notification_client_list *client_list;
243 struct lttng_condition *condition;
244 struct lttng_condition *condition_key = lttng_trigger_get_condition(
245 trigger_key);
246
247 assert(condition_key);
248
249 client_list = caa_container_of(node, struct notification_client_list,
250 notification_trigger_ht_node);
251 condition = lttng_trigger_get_condition(client_list->trigger);
252
253 return !!lttng_condition_is_equal(condition_key, condition);
254}
255
256static
257int match_client_list_condition(struct cds_lfht_node *node, const void *key)
258{
259 struct lttng_condition *condition_key = (struct lttng_condition *) key;
260 struct notification_client_list *client_list;
261 struct lttng_condition *condition;
262
263 assert(condition_key);
264
265 client_list = caa_container_of(node, struct notification_client_list,
266 notification_trigger_ht_node);
267 condition = lttng_trigger_get_condition(client_list->trigger);
268
269 return !!lttng_condition_is_equal(condition_key, condition);
270}
271
272static
273unsigned long lttng_condition_buffer_usage_hash(
274 struct lttng_condition *_condition)
275{
276 unsigned long hash = 0;
277 struct lttng_condition_buffer_usage *condition;
278
279 condition = container_of(_condition,
280 struct lttng_condition_buffer_usage, parent);
281
282 if (condition->session_name) {
283 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
284 }
285 if (condition->channel_name) {
8f56701f 286 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
287 }
288 if (condition->domain.set) {
289 hash ^= hash_key_ulong(
290 (void *) condition->domain.type,
291 lttng_ht_seed);
292 }
293 if (condition->threshold_ratio.set) {
294 uint64_t val;
295
296 val = condition->threshold_ratio.value * (double) UINT32_MAX;
297 hash ^= hash_key_u64(&val, lttng_ht_seed);
298 } else if (condition->threshold_ratio.set) {
299 uint64_t val;
300
301 val = condition->threshold_bytes.value;
302 hash ^= hash_key_u64(&val, lttng_ht_seed);
303 }
304 return hash;
305}
306
307/*
308 * The lttng_condition hashing code is kept in this file (rather than
309 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
310 * don't want to link in liblttng-ctl.
311 */
312static
313unsigned long lttng_condition_hash(struct lttng_condition *condition)
314{
315 switch (condition->type) {
316 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
318 return lttng_condition_buffer_usage_hash(condition);
319 default:
320 ERR("[notification-thread] Unexpected condition type caught");
321 abort();
322 }
323}
324
e4db5ace
JR
325static
326unsigned long hash_channel_key(struct channel_key *key)
327{
328 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
329 unsigned long domain_hash = hash_key_ulong(
330 (void *) (unsigned long) key->domain, lttng_ht_seed);
331
332 return key_hash ^ domain_hash;
333}
334
ab0ee2ca
JG
335static
336void channel_info_destroy(struct channel_info *channel_info)
337{
338 if (!channel_info) {
339 return;
340 }
341
342 if (channel_info->session_name) {
343 free(channel_info->session_name);
344 }
345 if (channel_info->channel_name) {
346 free(channel_info->channel_name);
347 }
348 free(channel_info);
349}
350
351static
352struct channel_info *channel_info_copy(struct channel_info *channel_info)
353{
354 struct channel_info *copy = zmalloc(sizeof(*channel_info));
355
356 assert(channel_info);
357 assert(channel_info->session_name);
358 assert(channel_info->channel_name);
359
360 if (!copy) {
361 goto end;
362 }
363
364 memcpy(copy, channel_info, sizeof(*channel_info));
365 copy->session_name = NULL;
366 copy->channel_name = NULL;
367
368 copy->session_name = strdup(channel_info->session_name);
369 if (!copy->session_name) {
370 goto error;
371 }
372 copy->channel_name = strdup(channel_info->channel_name);
373 if (!copy->channel_name) {
374 goto error;
375 }
376 cds_lfht_node_init(&channel_info->channels_ht_node);
377end:
378 return copy;
379error:
380 channel_info_destroy(copy);
381 return NULL;
382}
383
e4db5ace
JR
384/* This function must be called with the RCU read lock held. */
385static
386int evaluate_condition_for_client(struct lttng_trigger *trigger,
387 struct lttng_condition *condition,
388 struct notification_client *client,
389 struct notification_thread_state *state)
390{
391 int ret;
392 struct cds_lfht_iter iter;
393 struct cds_lfht_node *node;
394 struct channel_info *channel_info = NULL;
395 struct channel_key *channel_key = NULL;
396 struct channel_state_sample *last_sample = NULL;
397 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
398 struct lttng_evaluation *evaluation = NULL;
399 struct notification_client_list client_list = { 0 };
400 struct notification_client_list_element client_list_element = { 0 };
401
402 assert(trigger);
403 assert(condition);
404 assert(client);
405 assert(state);
406
407 /* Find the channel associated with the trigger. */
408 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
409 channel_trigger_list , channel_triggers_ht_node) {
410 struct lttng_trigger_list_element *element;
411
412 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
413 struct lttng_condition *current_condition =
414 lttng_trigger_get_condition(
415 element->trigger);
416
417 assert(current_condition);
418 if (!lttng_condition_is_equal(condition,
2ae99f0b 419 current_condition)) {
e4db5ace
JR
420 continue;
421 }
422
423 /* Found the trigger, save the channel key. */
424 channel_key = &channel_trigger_list->channel_key;
425 break;
426 }
427 if (channel_key) {
428 /* The channel key was found stop iteration. */
429 break;
430 }
431 }
432
433 if (!channel_key){
434 /* No channel found; normal exit. */
435 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
436 ret = 0;
437 goto end;
438 }
439
440 /* Fetch channel info for the matching channel. */
441 cds_lfht_lookup(state->channels_ht,
442 hash_channel_key(channel_key),
443 match_channel_info,
444 channel_key,
445 &iter);
446 node = cds_lfht_iter_get_node(&iter);
447 assert(node);
448 channel_info = caa_container_of(node, struct channel_info,
449 channels_ht_node);
450
451 /* Retrieve the channel's last sample, if it exists. */
452 cds_lfht_lookup(state->channel_state_ht,
453 hash_channel_key(channel_key),
454 match_channel_state_sample,
455 channel_key,
456 &iter);
457 node = cds_lfht_iter_get_node(&iter);
458 if (node) {
459 last_sample = caa_container_of(node,
460 struct channel_state_sample,
461 channel_state_ht_node);
462 } else {
463 /* Nothing to evaluate, no sample was ever taken. Normal exit */
464 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
465 ret = 0;
466 goto end;
467 }
468
469 ret = evaluate_condition(condition, &evaluation, state, NULL,
470 last_sample, channel_info->capacity);
471 if (ret) {
066a3c82 472 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
473 goto end;
474 }
475
476 if (!evaluation) {
477 /* Evaluation yielded nothing. Normal exit. */
478 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
479 ret = 0;
480 goto end;
481 }
482
483 /*
484 * Create a temporary client list with the client currently
485 * subscribing.
486 */
487 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
488 CDS_INIT_LIST_HEAD(&client_list.list);
489 client_list.trigger = trigger;
490
491 CDS_INIT_LIST_HEAD(&client_list_element.node);
492 client_list_element.client = client;
493 cds_list_add(&client_list_element.node, &client_list.list);
494
495 /* Send evaluation result to the newly-subscribed client. */
496 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
497 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
498 state, channel_info->uid, channel_info->gid);
499
500end:
501 return ret;
502}
503
ab0ee2ca
JG
504static
505int notification_thread_client_subscribe(struct notification_client *client,
506 struct lttng_condition *condition,
507 struct notification_thread_state *state,
508 enum lttng_notification_channel_status *_status)
509{
510 int ret = 0;
511 struct cds_lfht_iter iter;
512 struct cds_lfht_node *node;
513 struct notification_client_list *client_list;
514 struct lttng_condition_list_element *condition_list_element = NULL;
515 struct notification_client_list_element *client_list_element = NULL;
516 enum lttng_notification_channel_status status =
517 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
518
519 /*
520 * Ensure that the client has not already subscribed to this condition
521 * before.
522 */
523 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
524 if (lttng_condition_is_equal(condition_list_element->condition,
525 condition)) {
526 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
527 goto end;
528 }
529 }
530
531 condition_list_element = zmalloc(sizeof(*condition_list_element));
532 if (!condition_list_element) {
533 ret = -1;
534 goto error;
535 }
536 client_list_element = zmalloc(sizeof(*client_list_element));
537 if (!client_list_element) {
538 ret = -1;
539 goto error;
540 }
541
542 rcu_read_lock();
543
544 /*
545 * Add the newly-subscribed condition to the client's subscription list.
546 */
547 CDS_INIT_LIST_HEAD(&condition_list_element->node);
548 condition_list_element->condition = condition;
549 cds_list_add(&condition_list_element->node, &client->condition_list);
550
ab0ee2ca
JG
551 cds_lfht_lookup(state->notification_trigger_clients_ht,
552 lttng_condition_hash(condition),
553 match_client_list_condition,
554 condition,
555 &iter);
556 node = cds_lfht_iter_get_node(&iter);
557 if (!node) {
2ae99f0b
JG
558 /*
559 * No notification-emiting trigger registered with this
560 * condition. We don't evaluate the condition right away
561 * since this trigger is not registered yet.
562 */
4fb43b68 563 free(client_list_element);
ab0ee2ca
JG
564 goto end_unlock;
565 }
566
567 client_list = caa_container_of(node, struct notification_client_list,
568 notification_trigger_ht_node);
2ae99f0b
JG
569 /*
570 * The condition to which the client just subscribed is evaluated
571 * at this point so that conditions that are already TRUE result
572 * in a notification being sent out.
573 */
e4db5ace
JR
574 if (evaluate_condition_for_client(client_list->trigger, condition,
575 client, state)) {
576 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
577 ret = -1;
578 goto end_unlock;
579 }
580
581 /*
582 * Add the client to the list of clients interested in a given trigger
583 * if a "notification" trigger with a corresponding condition was
584 * added prior.
585 */
ab0ee2ca
JG
586 client_list_element->client = client;
587 CDS_INIT_LIST_HEAD(&client_list_element->node);
588 cds_list_add(&client_list_element->node, &client_list->list);
589end_unlock:
590 rcu_read_unlock();
591end:
592 if (_status) {
593 *_status = status;
594 }
595 return ret;
596error:
597 free(condition_list_element);
598 free(client_list_element);
599 return ret;
600}
601
602static
603int notification_thread_client_unsubscribe(
604 struct notification_client *client,
605 struct lttng_condition *condition,
606 struct notification_thread_state *state,
607 enum lttng_notification_channel_status *_status)
608{
609 struct cds_lfht_iter iter;
610 struct cds_lfht_node *node;
611 struct notification_client_list *client_list;
612 struct lttng_condition_list_element *condition_list_element,
613 *condition_tmp;
614 struct notification_client_list_element *client_list_element,
615 *client_tmp;
616 bool condition_found = false;
617 enum lttng_notification_channel_status status =
618 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
619
620 /* Remove the condition from the client's condition list. */
621 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
622 &client->condition_list, node) {
623 if (!lttng_condition_is_equal(condition_list_element->condition,
624 condition)) {
625 continue;
626 }
627
628 cds_list_del(&condition_list_element->node);
629 /*
630 * The caller may be iterating on the client's conditions to
631 * tear down a client's connection. In this case, the condition
632 * will be destroyed at the end.
633 */
634 if (condition != condition_list_element->condition) {
635 lttng_condition_destroy(
636 condition_list_element->condition);
637 }
638 free(condition_list_element);
639 condition_found = true;
640 break;
641 }
642
643 if (!condition_found) {
644 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
645 goto end;
646 }
647
648 /*
649 * Remove the client from the list of clients interested the trigger
650 * matching the condition.
651 */
652 rcu_read_lock();
653 cds_lfht_lookup(state->notification_trigger_clients_ht,
654 lttng_condition_hash(condition),
655 match_client_list_condition,
656 condition,
657 &iter);
658 node = cds_lfht_iter_get_node(&iter);
659 if (!node) {
660 goto end_unlock;
661 }
662
663 client_list = caa_container_of(node, struct notification_client_list,
664 notification_trigger_ht_node);
665 cds_list_for_each_entry_safe(client_list_element, client_tmp,
666 &client_list->list, node) {
667 if (client_list_element->client->socket != client->socket) {
668 continue;
669 }
670 cds_list_del(&client_list_element->node);
671 free(client_list_element);
672 break;
673 }
674end_unlock:
675 rcu_read_unlock();
676end:
677 lttng_condition_destroy(condition);
678 if (_status) {
679 *_status = status;
680 }
681 return 0;
682}
683
684static
685void notification_client_destroy(struct notification_client *client,
686 struct notification_thread_state *state)
687{
688 struct lttng_condition_list_element *condition_list_element, *tmp;
689
690 if (!client) {
691 return;
692 }
693
694 /* Release all conditions to which the client was subscribed. */
695 cds_list_for_each_entry_safe(condition_list_element, tmp,
696 &client->condition_list, node) {
697 (void) notification_thread_client_unsubscribe(client,
698 condition_list_element->condition, state, NULL);
699 }
700
701 if (client->socket >= 0) {
702 (void) lttcomm_close_unix_sock(client->socket);
703 }
704 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
705 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
706 free(client);
707}
708
709/*
710 * Call with rcu_read_lock held (and hold for the lifetime of the returned
711 * client pointer).
712 */
713static
714struct notification_client *get_client_from_socket(int socket,
715 struct notification_thread_state *state)
716{
717 struct cds_lfht_iter iter;
718 struct cds_lfht_node *node;
719 struct notification_client *client = NULL;
720
721 cds_lfht_lookup(state->client_socket_ht,
722 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
723 match_client,
724 (void *) (unsigned long) socket,
725 &iter);
726 node = cds_lfht_iter_get_node(&iter);
727 if (!node) {
728 goto end;
729 }
730
731 client = caa_container_of(node, struct notification_client,
732 client_socket_ht_node);
733end:
734 return client;
735}
736
737static
738bool trigger_applies_to_channel(struct lttng_trigger *trigger,
739 struct channel_info *info)
740{
741 enum lttng_condition_status status;
742 struct lttng_condition *condition;
743 const char *trigger_session_name = NULL;
744 const char *trigger_channel_name = NULL;
745 enum lttng_domain_type trigger_domain;
746
747 condition = lttng_trigger_get_condition(trigger);
748 if (!condition) {
749 goto fail;
750 }
751
752 switch (lttng_condition_get_type(condition)) {
753 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
754 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
755 break;
756 default:
757 goto fail;
758 }
759
760 status = lttng_condition_buffer_usage_get_domain_type(condition,
761 &trigger_domain);
762 assert(status == LTTNG_CONDITION_STATUS_OK);
763 if (info->key.domain != trigger_domain) {
764 goto fail;
765 }
766
767 status = lttng_condition_buffer_usage_get_session_name(
768 condition, &trigger_session_name);
769 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
770
771 status = lttng_condition_buffer_usage_get_channel_name(
772 condition, &trigger_channel_name);
773 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
774
775 if (strcmp(info->session_name, trigger_session_name)) {
776 goto fail;
777 }
778 if (strcmp(info->channel_name, trigger_channel_name)) {
779 goto fail;
780 }
781
782 return true;
783fail:
784 return false;
785}
786
787static
788bool trigger_applies_to_client(struct lttng_trigger *trigger,
789 struct notification_client *client)
790{
791 bool applies = false;
792 struct lttng_condition_list_element *condition_list_element;
793
794 cds_list_for_each_entry(condition_list_element, &client->condition_list,
795 node) {
796 applies = lttng_condition_is_equal(
797 condition_list_element->condition,
798 lttng_trigger_get_condition(trigger));
799 if (applies) {
800 break;
801 }
802 }
803 return applies;
804}
805
ab0ee2ca
JG
806static
807int handle_notification_thread_command_add_channel(
808 struct notification_thread_state *state,
809 struct channel_info *channel_info,
810 enum lttng_error_code *cmd_result)
811{
812 struct cds_list_head trigger_list;
813 struct channel_info *new_channel_info;
814 struct channel_key *channel_key;
815 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
816 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
817 int trigger_count = 0;
818 struct cds_lfht_iter iter;
819
820 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
821 channel_info->channel_name, channel_info->session_name,
822 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
823
824 CDS_INIT_LIST_HEAD(&trigger_list);
825
826 new_channel_info = channel_info_copy(channel_info);
827 if (!new_channel_info) {
828 goto error;
829 }
830
831 channel_key = &new_channel_info->key;
832
833 /* Build a list of all triggers applying to the new channel. */
834 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
835 node) {
836 struct lttng_trigger_list_element *new_element;
837
838 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
839 channel_info)) {
840 continue;
841 }
842
843 new_element = zmalloc(sizeof(*new_element));
844 if (!new_element) {
845 goto error;
846 }
847 CDS_INIT_LIST_HEAD(&new_element->node);
848 new_element->trigger = trigger_ht_element->trigger;
849 cds_list_add(&new_element->node, &trigger_list);
850 trigger_count++;
851 }
852
853 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
854 trigger_count);
855 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
856 if (!channel_trigger_list) {
857 goto error;
858 }
859 channel_trigger_list->channel_key = *channel_key;
860 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
861 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
862 cds_list_splice(&trigger_list, &channel_trigger_list->list);
863
864 rcu_read_lock();
865 /* Add channel to the channel_ht which owns the channel_infos. */
866 cds_lfht_add(state->channels_ht,
867 hash_channel_key(channel_key),
868 &new_channel_info->channels_ht_node);
869 /*
870 * Add the list of triggers associated with this channel to the
871 * channel_triggers_ht.
872 */
873 cds_lfht_add(state->channel_triggers_ht,
874 hash_channel_key(channel_key),
875 &channel_trigger_list->channel_triggers_ht_node);
876 rcu_read_unlock();
877 *cmd_result = LTTNG_OK;
878 return 0;
879error:
880 /* Empty trigger list */
881 channel_info_destroy(new_channel_info);
882 return 1;
883}
884
885static
886int handle_notification_thread_command_remove_channel(
887 struct notification_thread_state *state,
888 uint64_t channel_key, enum lttng_domain_type domain,
889 enum lttng_error_code *cmd_result)
890{
891 struct cds_lfht_node *node;
892 struct cds_lfht_iter iter;
893 struct lttng_channel_trigger_list *trigger_list;
894 struct lttng_trigger_list_element *trigger_list_element, *tmp;
895 struct channel_key key = { .key = channel_key, .domain = domain };
896 struct channel_info *channel_info;
897
898 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
899 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
900
901 rcu_read_lock();
902
903 cds_lfht_lookup(state->channel_triggers_ht,
904 hash_channel_key(&key),
905 match_channel_trigger_list,
906 &key,
907 &iter);
908 node = cds_lfht_iter_get_node(&iter);
909 /*
910 * There is a severe internal error if we are being asked to remove a
911 * channel that doesn't exist.
912 */
913 if (!node) {
914 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
915 goto end;
916 }
917
918 /* Free the list of triggers associated with this channel. */
919 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
920 channel_triggers_ht_node);
921 cds_list_for_each_entry_safe(trigger_list_element, tmp,
922 &trigger_list->list, node) {
923 cds_list_del(&trigger_list_element->node);
924 free(trigger_list_element);
925 }
926 cds_lfht_del(state->channel_triggers_ht, node);
927 free(trigger_list);
928
929 /* Free sampled channel state. */
930 cds_lfht_lookup(state->channel_state_ht,
931 hash_channel_key(&key),
932 match_channel_state_sample,
933 &key,
934 &iter);
935 node = cds_lfht_iter_get_node(&iter);
936 /*
937 * This is expected to be NULL if the channel is destroyed before we
938 * received a sample.
939 */
940 if (node) {
941 struct channel_state_sample *sample = caa_container_of(node,
942 struct channel_state_sample,
943 channel_state_ht_node);
944
945 cds_lfht_del(state->channel_state_ht, node);
946 free(sample);
947 }
948
949 /* Remove the channel from the channels_ht and free it. */
950 cds_lfht_lookup(state->channels_ht,
951 hash_channel_key(&key),
952 match_channel_info,
953 &key,
954 &iter);
955 node = cds_lfht_iter_get_node(&iter);
956 assert(node);
957 channel_info = caa_container_of(node, struct channel_info,
958 channels_ht_node);
959 cds_lfht_del(state->channels_ht, node);
960 channel_info_destroy(channel_info);
961end:
962 rcu_read_unlock();
963 *cmd_result = LTTNG_OK;
964 return 0;
965}
966
1da26331
JG
967static
968int condition_is_supported(struct lttng_condition *condition)
969{
970 int ret;
971
972 switch (lttng_condition_get_type(condition)) {
973 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
974 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
975 {
976 enum lttng_domain_type domain;
977
978 ret = lttng_condition_buffer_usage_get_domain_type(condition,
979 &domain);
980 if (ret) {
981 ret = -1;
982 goto end;
983 }
984
985 if (domain != LTTNG_DOMAIN_KERNEL) {
986 ret = 1;
987 goto end;
988 }
989
990 /*
991 * Older kernel tracers don't expose the API to monitor their
992 * buffers. Therefore, we reject triggers that require that
993 * mechanism to be available to be evaluated.
994 */
995 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
996 kernel_tracer_fd);
997 break;
998 }
999 default:
1000 ret = 1;
1001 }
1002end:
1003 return ret;
1004}
1005
ab0ee2ca
JG
1006/*
1007 * FIXME A client's credentials are not checked when registering a trigger, nor
1008 * are they stored alongside with the trigger.
1009 *
1da26331 1010 * The effects of this are benign since:
ab0ee2ca
JG
1011 * - The client will succeed in registering the trigger, as it is valid,
1012 * - The trigger will, internally, be bound to the channel,
1013 * - The notifications will not be sent since the client's credentials
1014 * are checked against the channel at that moment.
1da26331
JG
1015 *
1016 * If this function returns a non-zero value, it means something is
1017 * fundamentally and the whole subsystem/thread will be torn down.
1018 *
1019 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1020 * error code.
ab0ee2ca
JG
1021 */
1022static
1023int handle_notification_thread_command_register_trigger(
1024 struct notification_thread_state *state,
1025 struct lttng_trigger *trigger,
1026 enum lttng_error_code *cmd_result)
1027{
1028 int ret = 0;
1029 struct lttng_condition *condition;
1030 struct notification_client *client;
1031 struct notification_client_list *client_list = NULL;
1032 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1033 struct notification_client_list_element *client_list_element, *tmp;
1034 struct cds_lfht_node *node;
1035 struct cds_lfht_iter iter;
1036 struct channel_info *channel;
1037 bool free_trigger = true;
1038
1039 rcu_read_lock();
1040
1041 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
1042 assert(condition);
1043
1044 ret = condition_is_supported(condition);
1045 if (ret < 0) {
1046 goto error;
1047 } else if (ret == 0) {
1048 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1049 goto error;
1050 } else {
1051 /* Feature is supported, continue. */
1052 ret = 0;
1053 }
1054
ab0ee2ca
JG
1055 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1056 if (!trigger_ht_element) {
1057 ret = -1;
1058 goto error;
1059 }
1060
1061 /* Add trigger to the trigger_ht. */
1062 cds_lfht_node_init(&trigger_ht_element->node);
1063 trigger_ht_element->trigger = trigger;
1064
1065 node = cds_lfht_add_unique(state->triggers_ht,
1066 lttng_condition_hash(condition),
1067 match_condition,
1068 condition,
1069 &trigger_ht_element->node);
1070 if (node != &trigger_ht_element->node) {
1071 /* Not a fatal error, simply report it to the client. */
1072 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1073 goto error_free_ht_element;
1074 }
1075
1076 /*
1077 * Ownership of the trigger and of its wrapper was transfered to
1078 * the triggers_ht.
1079 */
1080 trigger_ht_element = NULL;
1081 free_trigger = false;
1082
1083 /*
1084 * The rest only applies to triggers that have a "notify" action.
1085 * It is not skipped as this is the only action type currently
1086 * supported.
1087 */
1088 client_list = zmalloc(sizeof(*client_list));
1089 if (!client_list) {
1090 ret = -1;
1091 goto error_free_ht_element;
1092 }
1093 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1094 CDS_INIT_LIST_HEAD(&client_list->list);
1095 client_list->trigger = trigger;
1096
1097 /* Build a list of clients to which this new trigger applies. */
1098 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1099 client_socket_ht_node) {
1100 if (!trigger_applies_to_client(trigger, client)) {
1101 continue;
1102 }
1103
1104 client_list_element = zmalloc(sizeof(*client_list_element));
1105 if (!client_list_element) {
1106 ret = -1;
1107 goto error_free_client_list;
1108 }
1109 CDS_INIT_LIST_HEAD(&client_list_element->node);
1110 client_list_element->client = client;
1111 cds_list_add(&client_list_element->node, &client_list->list);
1112 }
1113
1114 cds_lfht_add(state->notification_trigger_clients_ht,
1115 lttng_condition_hash(condition),
1116 &client_list->notification_trigger_ht_node);
ab0ee2ca
JG
1117
1118 /*
1119 * Add the trigger to list of triggers bound to the channels currently
1120 * known.
1121 */
1122 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1123 channels_ht_node) {
1124 struct lttng_trigger_list_element *trigger_list_element;
1125 struct lttng_channel_trigger_list *trigger_list;
1126
1127 if (!trigger_applies_to_channel(trigger, channel)) {
1128 continue;
1129 }
1130
1131 cds_lfht_lookup(state->channel_triggers_ht,
1132 hash_channel_key(&channel->key),
1133 match_channel_trigger_list,
1134 &channel->key,
1135 &iter);
1136 node = cds_lfht_iter_get_node(&iter);
1137 assert(node);
ab0ee2ca
JG
1138 trigger_list = caa_container_of(node,
1139 struct lttng_channel_trigger_list,
1140 channel_triggers_ht_node);
1141
1142 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1143 if (!trigger_list_element) {
1144 ret = -1;
1145 goto error_free_client_list;
1146 }
1147 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1148 trigger_list_element->trigger = trigger;
1149 cds_list_add(&trigger_list_element->node, &trigger_list->list);
e4db5ace 1150
ab0ee2ca
JG
1151 /* A trigger can only apply to one channel. */
1152 break;
1153 }
1154
2ae99f0b
JG
1155 /*
1156 * Since there is nothing preventing clients from subscribing to a
1157 * condition before the corresponding trigger is registered, we have
1158 * to evaluate this new condition right away.
1159 *
1160 * At some point, we were waiting for the next "evaluation" (e.g. on
1161 * reception of a channel sample) to evaluate this new condition, but
1162 * that was broken.
1163 *
1164 * The reason it was broken is that waiting for the next sample
1165 * does not allow us to properly handle transitions for edge-triggered
1166 * conditions.
1167 *
1168 * Consider this example: when we handle a new channel sample, we
1169 * evaluate each conditions twice: once with the previous state, and
1170 * again with the newest state. We then use those two results to
1171 * determine whether a state change happened: a condition was false and
1172 * became true. If a state change happened, we have to notify clients.
1173 *
1174 * Now, if a client subscribes to a given notification and registers
1175 * a trigger *after* that subscription, we have to make sure the
1176 * condition is evaluated at this point while considering only the
1177 * current state. Otherwise, the next evaluation cycle may only see
1178 * that the evaluations remain the same (true for samples n-1 and n) and
1179 * the client will never know that the condition has been met.
1180 */
1181 cds_list_for_each_entry_safe(client_list_element, tmp,
1182 &client_list->list, node) {
1183 ret = evaluate_condition_for_client(trigger, condition,
1184 client_list_element->client, state);
1185 if (ret) {
1186 goto error_free_client_list;
1187 }
1188 }
1189
1190 /*
1191 * Client list ownership transferred to the
1192 * notification_trigger_clients_ht.
1193 */
1194 client_list = NULL;
1195
ab0ee2ca
JG
1196 *cmd_result = LTTNG_OK;
1197error_free_client_list:
1198 if (client_list) {
1199 cds_list_for_each_entry_safe(client_list_element, tmp,
1200 &client_list->list, node) {
1201 free(client_list_element);
1202 }
1203 free(client_list);
1204 }
1205error_free_ht_element:
1206 free(trigger_ht_element);
1207error:
1208 if (free_trigger) {
1209 struct lttng_action *action = lttng_trigger_get_action(trigger);
1210
1211 lttng_condition_destroy(condition);
1212 lttng_action_destroy(action);
1213 lttng_trigger_destroy(trigger);
1214 }
1215 rcu_read_unlock();
1216 return ret;
1217}
1218
cc2295b5 1219static
ab0ee2ca
JG
1220int handle_notification_thread_command_unregister_trigger(
1221 struct notification_thread_state *state,
1222 struct lttng_trigger *trigger,
1223 enum lttng_error_code *_cmd_reply)
1224{
1225 struct cds_lfht_iter iter;
1226 struct cds_lfht_node *node, *triggers_ht_node;
1227 struct lttng_channel_trigger_list *trigger_list;
1228 struct notification_client_list *client_list;
1229 struct notification_client_list_element *client_list_element, *tmp;
1230 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1231 struct lttng_condition *condition = lttng_trigger_get_condition(
1232 trigger);
1233 struct lttng_action *action;
1234 enum lttng_error_code cmd_reply;
1235
1236 rcu_read_lock();
1237
1238 cds_lfht_lookup(state->triggers_ht,
1239 lttng_condition_hash(condition),
1240 match_condition,
1241 condition,
1242 &iter);
1243 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1244 if (!triggers_ht_node) {
1245 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1246 goto end;
1247 } else {
1248 cmd_reply = LTTNG_OK;
1249 }
1250
1251 /* Remove trigger from channel_triggers_ht. */
1252 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1253 channel_triggers_ht_node) {
1254 struct lttng_trigger_list_element *trigger_element, *tmp;
1255
1256 cds_list_for_each_entry_safe(trigger_element, tmp,
1257 &trigger_list->list, node) {
1258 struct lttng_condition *current_condition =
1259 lttng_trigger_get_condition(
1260 trigger_element->trigger);
1261
1262 assert(current_condition);
1263 if (!lttng_condition_is_equal(condition,
1264 current_condition)) {
1265 continue;
1266 }
1267
1268 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1269 cds_list_del(&trigger_element->node);
e4db5ace
JR
1270 /* A trigger can only appear once per channel */
1271 break;
ab0ee2ca
JG
1272 }
1273 }
1274
1275 /*
1276 * Remove and release the client list from
1277 * notification_trigger_clients_ht.
1278 */
1279 cds_lfht_lookup(state->notification_trigger_clients_ht,
1280 lttng_condition_hash(condition),
1281 match_client_list,
1282 trigger,
1283 &iter);
1284 node = cds_lfht_iter_get_node(&iter);
1285 assert(node);
1286 client_list = caa_container_of(node, struct notification_client_list,
1287 notification_trigger_ht_node);
1288 cds_list_for_each_entry_safe(client_list_element, tmp,
1289 &client_list->list, node) {
1290 free(client_list_element);
1291 }
1292 cds_lfht_del(state->notification_trigger_clients_ht, node);
1293 free(client_list);
1294
1295 /* Remove trigger from triggers_ht. */
1296 trigger_ht_element = caa_container_of(triggers_ht_node,
1297 struct lttng_trigger_ht_element, node);
1298 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1299
1300 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1301 lttng_condition_destroy(condition);
1302 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1303 lttng_action_destroy(action);
1304 lttng_trigger_destroy(trigger_ht_element->trigger);
1305 free(trigger_ht_element);
1306end:
1307 rcu_read_unlock();
1308 if (_cmd_reply) {
1309 *_cmd_reply = cmd_reply;
1310 }
1311 return 0;
1312}
1313
1314/* Returns 0 on success, 1 on exit requested, negative value on error. */
1315int handle_notification_thread_command(
1316 struct notification_thread_handle *handle,
1317 struct notification_thread_state *state)
1318{
1319 int ret;
1320 uint64_t counter;
1321 struct notification_thread_command *cmd;
1322
1323 /* Read event_fd to put it back into a quiescent state. */
814b4934 1324 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
ab0ee2ca
JG
1325 if (ret == -1) {
1326 goto error;
1327 }
1328
1329 pthread_mutex_lock(&handle->cmd_queue.lock);
1330 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1331 struct notification_thread_command, cmd_list_node);
1332 switch (cmd->type) {
1333 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1334 DBG("[notification-thread] Received register trigger command");
1335 ret = handle_notification_thread_command_register_trigger(
1336 state, cmd->parameters.trigger,
1337 &cmd->reply_code);
1338 break;
1339 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1340 DBG("[notification-thread] Received unregister trigger command");
1341 ret = handle_notification_thread_command_unregister_trigger(
1342 state, cmd->parameters.trigger,
1343 &cmd->reply_code);
1344 break;
1345 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1346 DBG("[notification-thread] Received add channel command");
1347 ret = handle_notification_thread_command_add_channel(
1348 state, &cmd->parameters.add_channel,
1349 &cmd->reply_code);
1350 break;
1351 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1352 DBG("[notification-thread] Received remove channel command");
1353 ret = handle_notification_thread_command_remove_channel(
1354 state, cmd->parameters.remove_channel.key,
1355 cmd->parameters.remove_channel.domain,
1356 &cmd->reply_code);
1357 break;
1358 case NOTIFICATION_COMMAND_TYPE_QUIT:
1359 DBG("[notification-thread] Received quit command");
1360 cmd->reply_code = LTTNG_OK;
1361 ret = 1;
1362 goto end;
1363 default:
1364 ERR("[notification-thread] Unknown internal command received");
1365 goto error_unlock;
1366 }
1367
1368 if (ret) {
1369 goto error_unlock;
1370 }
1371end:
1372 cds_list_del(&cmd->cmd_list_node);
8ada111f 1373 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1374 pthread_mutex_unlock(&handle->cmd_queue.lock);
1375 return ret;
1376error_unlock:
1377 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1378 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1379 pthread_mutex_unlock(&handle->cmd_queue.lock);
1380 cmd->reply_code = LTTNG_ERR_FATAL;
1381error:
1382 /* Indicate a fatal error to the caller. */
1383 return -1;
1384}
1385
1386static
1387unsigned long hash_client_socket(int socket)
1388{
1389 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1390}
1391
1392static
1393int socket_set_non_blocking(int socket)
1394{
1395 int ret, flags;
1396
1397 /* Set the pipe as non-blocking. */
1398 ret = fcntl(socket, F_GETFL, 0);
1399 if (ret == -1) {
1400 PERROR("fcntl get socket flags");
1401 goto end;
1402 }
1403 flags = ret;
1404
1405 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1406 if (ret == -1) {
1407 PERROR("fcntl set O_NONBLOCK socket flag");
1408 goto end;
1409 }
1410 DBG("Client socket (fd = %i) set as non-blocking", socket);
1411end:
1412 return ret;
1413}
1414
1415static
14fa22f8 1416int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1417{
1418 int ret;
1419
1420 ret = lttng_dynamic_buffer_set_size(
1421 &client->communication.inbound.buffer, 0);
1422 assert(!ret);
1423
1424 client->communication.inbound.bytes_to_receive =
1425 sizeof(struct lttng_notification_channel_message);
1426 client->communication.inbound.msg_type =
1427 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1428 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1429 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1430 ret = lttng_dynamic_buffer_set_size(
1431 &client->communication.inbound.buffer,
1432 client->communication.inbound.bytes_to_receive);
1433 return ret;
ab0ee2ca
JG
1434}
1435
1436int handle_notification_thread_client_connect(
1437 struct notification_thread_state *state)
1438{
1439 int ret;
1440 struct notification_client *client;
1441
1442 DBG("[notification-thread] Handling new notification channel client connection");
1443
1444 client = zmalloc(sizeof(*client));
1445 if (!client) {
1446 /* Fatal error. */
1447 ret = -1;
1448 goto error;
1449 }
1450 CDS_INIT_LIST_HEAD(&client->condition_list);
1451 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1452 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1453 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1454 ret = client_reset_inbound_state(client);
1455 if (ret) {
1456 ERR("[notification-thread] Failed to reset client communication's inbound state");
1457 ret = 0;
1458 goto error;
1459 }
ab0ee2ca
JG
1460
1461 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1462 if (ret < 0) {
1463 ERR("[notification-thread] Failed to accept new notification channel client connection");
1464 ret = 0;
1465 goto error;
1466 }
1467
1468 client->socket = ret;
1469
1470 ret = socket_set_non_blocking(client->socket);
1471 if (ret) {
1472 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1473 goto error;
1474 }
1475
1476 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1477 if (ret < 0) {
1478 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1479 ret = 0;
1480 goto error;
1481 }
1482
1483 ret = lttng_poll_add(&state->events, client->socket,
1484 LPOLLIN | LPOLLERR |
1485 LPOLLHUP | LPOLLRDHUP);
1486 if (ret < 0) {
1487 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1488 ret = 0;
1489 goto error;
1490 }
1491 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1492 client->socket);
1493
ab0ee2ca
JG
1494 rcu_read_lock();
1495 cds_lfht_add(state->client_socket_ht,
1496 hash_client_socket(client->socket),
1497 &client->client_socket_ht_node);
1498 rcu_read_unlock();
1499
1500 return ret;
1501error:
1502 notification_client_destroy(client, state);
1503 return ret;
1504}
1505
1506int handle_notification_thread_client_disconnect(
1507 int client_socket,
1508 struct notification_thread_state *state)
1509{
1510 int ret = 0;
1511 struct notification_client *client;
1512
1513 rcu_read_lock();
1514 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1515 client_socket);
1516 client = get_client_from_socket(client_socket, state);
1517 if (!client) {
1518 /* Internal state corruption, fatal error. */
1519 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1520 client_socket);
1521 ret = -1;
1522 goto end;
1523 }
1524
1525 ret = lttng_poll_del(&state->events, client_socket);
1526 if (ret) {
1527 ERR("[notification-thread] Failed to remove client socket from poll set");
1528 }
1529 cds_lfht_del(state->client_socket_ht,
1530 &client->client_socket_ht_node);
1531 notification_client_destroy(client, state);
1532end:
1533 rcu_read_unlock();
1534 return ret;
1535}
1536
1537int handle_notification_thread_client_disconnect_all(
1538 struct notification_thread_state *state)
1539{
1540 struct cds_lfht_iter iter;
1541 struct notification_client *client;
1542 bool error_encoutered = false;
1543
1544 rcu_read_lock();
1545 DBG("[notification-thread] Closing all client connections");
1546 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1547 client_socket_ht_node) {
1548 int ret;
1549
1550 ret = handle_notification_thread_client_disconnect(
1551 client->socket, state);
1552 if (ret) {
1553 error_encoutered = true;
1554 }
1555 }
1556 rcu_read_unlock();
1557 return error_encoutered ? 1 : 0;
1558}
1559
1560int handle_notification_thread_trigger_unregister_all(
1561 struct notification_thread_state *state)
1562{
4149ace8 1563 bool error_occurred = false;
ab0ee2ca
JG
1564 struct cds_lfht_iter iter;
1565 struct lttng_trigger_ht_element *trigger_ht_element;
1566
1567 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1568 node) {
1569 int ret = handle_notification_thread_command_unregister_trigger(
1570 state, trigger_ht_element->trigger, NULL);
1571 if (ret) {
4149ace8 1572 error_occurred = true;
ab0ee2ca
JG
1573 }
1574 }
4149ace8 1575 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1576}
1577
1578static
1579int client_flush_outgoing_queue(struct notification_client *client,
1580 struct notification_thread_state *state)
1581{
1582 ssize_t ret;
1583 size_t to_send_count;
1584
1585 assert(client->communication.outbound.buffer.size != 0);
1586 to_send_count = client->communication.outbound.buffer.size;
1587 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1588 client->socket);
1589
1590 ret = lttcomm_send_unix_sock_non_block(client->socket,
1591 client->communication.outbound.buffer.data,
1592 to_send_count);
1593 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1594 (ret > 0 && ret < to_send_count)) {
1595 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1596 client->socket);
1597 to_send_count -= max(ret, 0);
1598
1599 memcpy(client->communication.outbound.buffer.data,
1600 client->communication.outbound.buffer.data +
1601 client->communication.outbound.buffer.size - to_send_count,
1602 to_send_count);
1603 ret = lttng_dynamic_buffer_set_size(
1604 &client->communication.outbound.buffer,
1605 to_send_count);
1606 if (ret) {
1607 goto error;
1608 }
1609
1610 /*
1611 * We want to be notified whenever there is buffer space
1612 * available to send the rest of the payload.
1613 */
1614 ret = lttng_poll_mod(&state->events, client->socket,
1615 CLIENT_POLL_MASK_IN_OUT);
1616 if (ret) {
1617 goto error;
1618 }
1619 } else if (ret < 0) {
1620 /* Generic error, disconnect the client. */
1621 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1622 client->socket);
1623 ret = handle_notification_thread_client_disconnect(
1624 client->socket, state);
1625 if (ret) {
1626 goto error;
1627 }
1628 } else {
1629 /* No error and flushed the queue completely. */
1630 ret = lttng_dynamic_buffer_set_size(
1631 &client->communication.outbound.buffer, 0);
1632 if (ret) {
1633 goto error;
1634 }
1635 ret = lttng_poll_mod(&state->events, client->socket,
1636 CLIENT_POLL_MASK_IN);
1637 if (ret) {
1638 goto error;
1639 }
1640
1641 client->communication.outbound.queued_command_reply = false;
1642 client->communication.outbound.dropped_notification = false;
1643 }
1644
1645 return 0;
1646error:
1647 return -1;
1648}
1649
1650static
1651int client_send_command_reply(struct notification_client *client,
1652 struct notification_thread_state *state,
1653 enum lttng_notification_channel_status status)
1654{
1655 int ret;
1656 struct lttng_notification_channel_command_reply reply = {
1657 .status = (int8_t) status,
1658 };
1659 struct lttng_notification_channel_message msg = {
1660 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1661 .size = sizeof(reply),
1662 };
1663 char buffer[sizeof(msg) + sizeof(reply)];
1664
1665 if (client->communication.outbound.queued_command_reply) {
1666 /* Protocol error. */
1667 goto error;
1668 }
1669
1670 memcpy(buffer, &msg, sizeof(msg));
1671 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1672 DBG("[notification-thread] Send command reply (%i)", (int) status);
1673
1674 /* Enqueue buffer to outgoing queue and flush it. */
1675 ret = lttng_dynamic_buffer_append(
1676 &client->communication.outbound.buffer,
1677 buffer, sizeof(buffer));
1678 if (ret) {
1679 goto error;
1680 }
1681
1682 ret = client_flush_outgoing_queue(client, state);
1683 if (ret) {
1684 goto error;
1685 }
1686
1687 if (client->communication.outbound.buffer.size != 0) {
1688 /* Queue could not be emptied. */
1689 client->communication.outbound.queued_command_reply = true;
1690 }
1691
1692 return 0;
1693error:
1694 return -1;
1695}
1696
1697static
1698int client_dispatch_message(struct notification_client *client,
1699 struct notification_thread_state *state)
1700{
1701 int ret = 0;
1702
1703 if (client->communication.inbound.msg_type !=
1704 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1705 client->communication.inbound.msg_type !=
1706 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1707 !client->validated) {
1708 WARN("[notification-thread] client attempted a command before handshake");
1709 ret = -1;
1710 goto end;
1711 }
1712
1713 switch (client->communication.inbound.msg_type) {
1714 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1715 {
1716 /*
1717 * Receiving message header. The function will be called again
1718 * once the rest of the message as been received and can be
1719 * interpreted.
1720 */
1721 const struct lttng_notification_channel_message *msg;
1722
1723 assert(sizeof(*msg) ==
1724 client->communication.inbound.buffer.size);
1725 msg = (const struct lttng_notification_channel_message *)
1726 client->communication.inbound.buffer.data;
1727
1728 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1729 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1730 ret = -1;
1731 goto end;
1732 }
1733
1734 switch (msg->type) {
1735 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1736 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1737 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1738 break;
1739 default:
1740 ret = -1;
1741 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1742 goto end;
1743 }
1744
1745 client->communication.inbound.bytes_to_receive = msg->size;
1746 client->communication.inbound.msg_type =
1747 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1748 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1749 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1750 if (ret) {
1751 goto end;
1752 }
1753 break;
1754 }
1755 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1756 {
1757 struct lttng_notification_channel_command_handshake *handshake_client;
1758 struct lttng_notification_channel_command_handshake handshake_reply = {
1759 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1760 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1761 };
1762 struct lttng_notification_channel_message msg_header = {
1763 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1764 .size = sizeof(handshake_reply),
1765 };
1766 enum lttng_notification_channel_status status =
1767 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1768 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1769
1770 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1771 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1772 sizeof(handshake_reply));
1773
1774 handshake_client =
1775 (struct lttng_notification_channel_command_handshake *)
1776 client->communication.inbound.buffer.data;
47a32869 1777 client->major = handshake_client->major;
ab0ee2ca
JG
1778 client->minor = handshake_client->minor;
1779 if (!client->communication.inbound.creds_received) {
1780 ERR("[notification-thread] No credentials received from client");
1781 ret = -1;
1782 goto end;
1783 }
1784
1785 client->uid = LTTNG_SOCK_GET_UID_CRED(
1786 &client->communication.inbound.creds);
1787 client->gid = LTTNG_SOCK_GET_GID_CRED(
1788 &client->communication.inbound.creds);
1789 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1790 client->uid, client->gid, (int) client->major,
1791 (int) client->minor);
1792
1793 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1794 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1795 }
1796
1797 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1798 send_buffer, sizeof(send_buffer));
1799 if (ret) {
1800 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1801 goto end;
1802 }
1803
1804 ret = client_flush_outgoing_queue(client, state);
1805 if (ret) {
1806 goto end;
1807 }
1808
1809 ret = client_send_command_reply(client, state, status);
1810 if (ret) {
1811 ERR("[notification-thread] Failed to send reply to notification channel client");
1812 goto end;
1813 }
1814
1815 /* Set reception state to receive the next message header. */
14fa22f8
JG
1816 ret = client_reset_inbound_state(client);
1817 if (ret) {
1818 ERR("[notification-thread] Failed to reset client communication's inbound state");
1819 goto end;
1820 }
ab0ee2ca
JG
1821 client->validated = true;
1822 break;
1823 }
1824 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1825 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1826 {
1827 struct lttng_condition *condition;
1828 enum lttng_notification_channel_status status =
1829 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1830 const struct lttng_buffer_view condition_view =
1831 lttng_buffer_view_from_dynamic_buffer(
1832 &client->communication.inbound.buffer,
1833 0, -1);
1834 size_t expected_condition_size =
1835 client->communication.inbound.buffer.size;
1836
1837 ret = lttng_condition_create_from_buffer(&condition_view,
1838 &condition);
1839 if (ret != expected_condition_size) {
1840 ERR("[notification-thread] Malformed condition received from client");
1841 goto end;
1842 }
1843
1844 if (client->communication.inbound.msg_type ==
1845 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1846 /*
1847 * FIXME The current state should be evaluated on
1848 * subscription.
1849 */
1850 ret = notification_thread_client_subscribe(client,
1851 condition, state, &status);
1852 } else {
1853 ret = notification_thread_client_unsubscribe(client,
1854 condition, state, &status);
1855 }
1856 if (ret) {
1857 goto end;
1858 }
1859
1860 ret = client_send_command_reply(client, state, status);
1861 if (ret) {
1862 ERR("[notification-thread] Failed to send reply to notification channel client");
1863 goto end;
1864 }
1865
1866 /* Set reception state to receive the next message header. */
14fa22f8
JG
1867 ret = client_reset_inbound_state(client);
1868 if (ret) {
1869 ERR("[notification-thread] Failed to reset client communication's inbound state");
1870 goto end;
1871 }
ab0ee2ca
JG
1872 break;
1873 }
1874 default:
1875 abort();
1876 }
1877end:
1878 return ret;
1879}
1880
1881/* Incoming data from client. */
1882int handle_notification_thread_client_in(
1883 struct notification_thread_state *state, int socket)
1884{
14fa22f8 1885 int ret = 0;
ab0ee2ca
JG
1886 struct notification_client *client;
1887 ssize_t recv_ret;
1888 size_t offset;
1889
1890 client = get_client_from_socket(socket, state);
1891 if (!client) {
1892 /* Internal error, abort. */
1893 ret = -1;
1894 goto end;
1895 }
1896
14fa22f8
JG
1897 offset = client->communication.inbound.buffer.size -
1898 client->communication.inbound.bytes_to_receive;
01ea340e 1899 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1900 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1901 client->communication.inbound.buffer.data + offset,
1902 client->communication.inbound.bytes_to_receive,
1903 &client->communication.inbound.creds);
1904 if (recv_ret > 0) {
01ea340e 1905 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1906 client->communication.inbound.creds_received = true;
1907 }
1908 } else {
1909 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1910 client->communication.inbound.buffer.data + offset,
1911 client->communication.inbound.bytes_to_receive);
1912 }
1913 if (recv_ret < 0) {
1914 goto error_disconnect_client;
1915 }
1916
1917 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1918 if (client->communication.inbound.bytes_to_receive == 0) {
1919 ret = client_dispatch_message(client, state);
1920 if (ret) {
1921 /*
1922 * Only returns an error if this client must be
1923 * disconnected.
1924 */
1925 goto error_disconnect_client;
1926 }
1927 } else {
1928 goto end;
1929 }
1930end:
1931 return ret;
1932error_disconnect_client:
1933 ret = handle_notification_thread_client_disconnect(socket, state);
1934 return ret;
1935}
1936
1937/* Client ready to receive outgoing data. */
1938int handle_notification_thread_client_out(
1939 struct notification_thread_state *state, int socket)
1940{
1941 int ret;
1942 struct notification_client *client;
1943
1944 client = get_client_from_socket(socket, state);
1945 if (!client) {
1946 /* Internal error, abort. */
1947 ret = -1;
1948 goto end;
1949 }
1950
1951 ret = client_flush_outgoing_queue(client, state);
1952 if (ret) {
1953 goto end;
1954 }
1955end:
1956 return ret;
1957}
1958
1959static
1960bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1961 struct channel_state_sample *sample, uint64_t buffer_capacity)
1962{
1963 bool result = false;
1964 uint64_t threshold;
1965 enum lttng_condition_type condition_type;
1966 struct lttng_condition_buffer_usage *use_condition = container_of(
1967 condition, struct lttng_condition_buffer_usage,
1968 parent);
1969
1970 if (!sample) {
1971 goto end;
1972 }
1973
1974 if (use_condition->threshold_bytes.set) {
1975 threshold = use_condition->threshold_bytes.value;
1976 } else {
1977 /*
1978 * Threshold was expressed as a ratio.
1979 *
1980 * TODO the threshold (in bytes) of conditions expressed
1981 * as a ratio of total buffer size could be cached to
1982 * forego this double-multiplication or it could be performed
1983 * as fixed-point math.
1984 *
1985 * Note that caching should accomodate the case where the
1986 * condition applies to multiple channels (i.e. don't assume
1987 * that all channels matching my_chann* have the same size...)
1988 */
1989 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1990 (double) buffer_capacity);
1991 }
1992
1993 condition_type = lttng_condition_get_type(condition);
1994 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1995 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1996 threshold, sample->highest_usage);
1997
1998 /*
1999 * The low condition should only be triggered once _all_ of the
2000 * streams in a channel have gone below the "low" threshold.
2001 */
2002 if (sample->highest_usage <= threshold) {
2003 result = true;
2004 }
2005 } else {
2006 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2007 threshold, sample->highest_usage);
2008
2009 /*
2010 * For high buffer usage scenarios, we want to trigger whenever
2011 * _any_ of the streams has reached the "high" threshold.
2012 */
2013 if (sample->highest_usage >= threshold) {
2014 result = true;
2015 }
2016 }
2017end:
2018 return result;
2019}
2020
2021static
2022int evaluate_condition(struct lttng_condition *condition,
2023 struct lttng_evaluation **evaluation,
2024 struct notification_thread_state *state,
2025 struct channel_state_sample *previous_sample,
2026 struct channel_state_sample *latest_sample,
2027 uint64_t buffer_capacity)
2028{
2029 int ret = 0;
2030 enum lttng_condition_type condition_type;
2031 bool previous_sample_result;
2032 bool latest_sample_result;
2033
2034 condition_type = lttng_condition_get_type(condition);
2035 /* No other condition type supported for the moment. */
2036 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2037 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2038
2039 previous_sample_result = evaluate_buffer_usage_condition(condition,
2040 previous_sample, buffer_capacity);
2041 latest_sample_result = evaluate_buffer_usage_condition(condition,
2042 latest_sample, buffer_capacity);
2043
2044 if (!latest_sample_result ||
2045 (previous_sample_result == latest_sample_result)) {
2046 /*
2047 * Only trigger on a condition evaluation transition.
2048 *
2049 * NOTE: This edge-triggered logic may not be appropriate for
2050 * future condition types.
2051 */
2052 goto end;
2053 }
2054
2055 if (evaluation && latest_sample_result) {
2056 *evaluation = lttng_evaluation_buffer_usage_create(
2057 condition_type,
2058 latest_sample->highest_usage,
2059 buffer_capacity);
2060 if (!*evaluation) {
2061 ret = -1;
2062 goto end;
2063 }
2064 }
2065end:
2066 return ret;
2067}
2068
2069static
2070int client_enqueue_dropped_notification(struct notification_client *client,
2071 struct notification_thread_state *state)
2072{
2073 int ret;
2074 struct lttng_notification_channel_message msg = {
2075 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2076 .size = 0,
2077 };
2078
2079 ret = lttng_dynamic_buffer_append(
2080 &client->communication.outbound.buffer, &msg,
2081 sizeof(msg));
2082 return ret;
2083}
2084
2085static
2086int send_evaluation_to_clients(struct lttng_trigger *trigger,
2087 struct lttng_evaluation *evaluation,
2088 struct notification_client_list* client_list,
2089 struct notification_thread_state *state,
2090 uid_t channel_uid, gid_t channel_gid)
2091{
2092 int ret = 0;
2093 struct lttng_dynamic_buffer msg_buffer;
2094 struct notification_client_list_element *client_list_element, *tmp;
2095 struct lttng_notification *notification;
2096 struct lttng_condition *condition;
2097 ssize_t expected_notification_size, notification_size;
2098 struct lttng_notification_channel_message msg;
2099
2100 lttng_dynamic_buffer_init(&msg_buffer);
2101
2102 condition = lttng_trigger_get_condition(trigger);
2103 assert(condition);
2104
2105 notification = lttng_notification_create(condition, evaluation);
2106 if (!notification) {
2107 ret = -1;
2108 goto end;
2109 }
2110
2111 expected_notification_size = lttng_notification_serialize(notification,
2112 NULL);
2113 if (expected_notification_size < 0) {
2114 ERR("[notification-thread] Failed to get size of serialized notification");
2115 ret = -1;
2116 goto end;
2117 }
2118
2119 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2120 msg.size = (uint32_t) expected_notification_size;
2121 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2122 if (ret) {
2123 goto end;
2124 }
2125
2126 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2127 msg_buffer.size + expected_notification_size);
2128 if (ret) {
2129 goto end;
2130 }
2131
2132 notification_size = lttng_notification_serialize(notification,
2133 msg_buffer.data + sizeof(msg));
2134 if (notification_size != expected_notification_size) {
2135 ERR("[notification-thread] Failed to serialize notification");
2136 ret = -1;
2137 goto end;
2138 }
2139
2140 cds_list_for_each_entry_safe(client_list_element, tmp,
2141 &client_list->list, node) {
2142 struct notification_client *client =
2143 client_list_element->client;
2144
2145 if (client->uid != channel_uid && client->gid != channel_gid &&
2146 client->uid != 0) {
2147 /* Client is not allowed to monitor this channel. */
2148 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2149 continue;
2150 }
2151
2152 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2153 client->socket, msg_buffer.size);
2154 if (client->communication.outbound.buffer.size) {
2155 /*
2156 * Outgoing data is already buffered for this client;
2157 * drop the notification and enqueue a "dropped
2158 * notification" message if this is the first dropped
2159 * notification since the socket spilled-over to the
2160 * queue.
2161 */
2162 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2163 client->socket);
2164 if (!client->communication.outbound.dropped_notification) {
2165 client->communication.outbound.dropped_notification = true;
2166 ret = client_enqueue_dropped_notification(
2167 client, state);
2168 if (ret) {
2169 goto end;
2170 }
2171 }
2172 continue;
2173 }
2174
2175 ret = lttng_dynamic_buffer_append_buffer(
2176 &client->communication.outbound.buffer,
2177 &msg_buffer);
2178 if (ret) {
2179 goto end;
2180 }
2181
2182 ret = client_flush_outgoing_queue(client, state);
2183 if (ret) {
2184 goto end;
2185 }
2186 }
2187 ret = 0;
2188end:
2189 lttng_notification_destroy(notification);
2190 lttng_dynamic_buffer_reset(&msg_buffer);
2191 return ret;
2192}
2193
2194int handle_notification_thread_channel_sample(
2195 struct notification_thread_state *state, int pipe,
2196 enum lttng_domain_type domain)
2197{
2198 int ret = 0;
2199 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2200 struct channel_state_sample previous_sample, latest_sample;
2201 struct channel_info *channel_info;
2202 struct cds_lfht_node *node;
2203 struct cds_lfht_iter iter;
2204 struct lttng_channel_trigger_list *trigger_list;
2205 struct lttng_trigger_list_element *trigger_list_element;
2206 bool previous_sample_available = false;
2207
2208 /*
2209 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2210 * ensuring that read/write of sampling messages are atomic.
2211 */
7c2551ef 2212 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2213 if (ret != sizeof(sample_msg)) {
2214 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2215 pipe);
2216 ret = -1;
2217 goto end;
2218 }
2219
2220 ret = 0;
2221 latest_sample.key.key = sample_msg.key;
2222 latest_sample.key.domain = domain;
2223 latest_sample.highest_usage = sample_msg.highest;
2224 latest_sample.lowest_usage = sample_msg.lowest;
2225
2226 rcu_read_lock();
2227
2228 /* Retrieve the channel's informations */
2229 cds_lfht_lookup(state->channels_ht,
2230 hash_channel_key(&latest_sample.key),
2231 match_channel_info,
2232 &latest_sample.key,
2233 &iter);
2234 node = cds_lfht_iter_get_node(&iter);
2235 if (!node) {
2236 /*
2237 * Not an error since the consumer can push a sample to the pipe
2238 * and the rest of the session daemon could notify us of the
2239 * channel's destruction before we get a chance to process that
2240 * sample.
2241 */
2242 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2243 latest_sample.key.key,
2244 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2245 "user space");
2246 goto end_unlock;
2247 }
2248 channel_info = caa_container_of(node, struct channel_info,
2249 channels_ht_node);
2250 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2251 channel_info->channel_name,
2252 latest_sample.key.key,
2253 channel_info->session_name,
2254 latest_sample.highest_usage,
2255 latest_sample.lowest_usage);
2256
2257 /* Retrieve the channel's last sample, if it exists, and update it. */
2258 cds_lfht_lookup(state->channel_state_ht,
2259 hash_channel_key(&latest_sample.key),
2260 match_channel_state_sample,
2261 &latest_sample.key,
2262 &iter);
2263 node = cds_lfht_iter_get_node(&iter);
2264 if (node) {
2265 struct channel_state_sample *stored_sample;
2266
2267 /* Update the sample stored. */
2268 stored_sample = caa_container_of(node,
2269 struct channel_state_sample,
2270 channel_state_ht_node);
2271 memcpy(&previous_sample, stored_sample,
2272 sizeof(previous_sample));
2273 stored_sample->highest_usage = latest_sample.highest_usage;
2274 stored_sample->lowest_usage = latest_sample.lowest_usage;
2275 previous_sample_available = true;
2276 } else {
2277 /*
2278 * This is the channel's first sample, allocate space for and
2279 * store the new sample.
2280 */
2281 struct channel_state_sample *stored_sample;
2282
2283 stored_sample = zmalloc(sizeof(*stored_sample));
2284 if (!stored_sample) {
2285 ret = -1;
2286 goto end_unlock;
2287 }
2288
2289 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2290 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2291 cds_lfht_add(state->channel_state_ht,
2292 hash_channel_key(&stored_sample->key),
2293 &stored_sample->channel_state_ht_node);
2294 }
2295
2296 /* Find triggers associated with this channel. */
2297 cds_lfht_lookup(state->channel_triggers_ht,
2298 hash_channel_key(&latest_sample.key),
2299 match_channel_trigger_list,
2300 &latest_sample.key,
2301 &iter);
2302 node = cds_lfht_iter_get_node(&iter);
2303 if (!node) {
2304 goto end_unlock;
2305 }
2306
2307 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2308 channel_triggers_ht_node);
2309 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2310 node) {
2311 struct lttng_condition *condition;
2312 struct lttng_action *action;
2313 struct lttng_trigger *trigger;
2314 struct notification_client_list *client_list;
2315 struct lttng_evaluation *evaluation = NULL;
2316
2317 trigger = trigger_list_element->trigger;
2318 condition = lttng_trigger_get_condition(trigger);
2319 assert(condition);
2320 action = lttng_trigger_get_action(trigger);
2321
2322 /* Notify actions are the only type currently supported. */
2323 assert(lttng_action_get_type(action) ==
2324 LTTNG_ACTION_TYPE_NOTIFY);
2325
2326 /*
2327 * Check if any client is subscribed to the result of this
2328 * evaluation.
2329 */
2330 cds_lfht_lookup(state->notification_trigger_clients_ht,
2331 lttng_condition_hash(condition),
2332 match_client_list,
2333 trigger,
2334 &iter);
2335 node = cds_lfht_iter_get_node(&iter);
2336 assert(node);
2337
2338 client_list = caa_container_of(node,
2339 struct notification_client_list,
2340 notification_trigger_ht_node);
2341 if (cds_list_empty(&client_list->list)) {
2342 /*
2343 * No clients interested in the evaluation's result,
2344 * skip it.
2345 */
2346 continue;
2347 }
2348
2349 ret = evaluate_condition(condition, &evaluation, state,
2350 previous_sample_available ? &previous_sample : NULL,
2351 &latest_sample, channel_info->capacity);
2352 if (ret) {
2353 goto end_unlock;
2354 }
2355
2356 if (!evaluation) {
2357 continue;
2358 }
2359
2360 /* Dispatch evaluation result to all clients. */
2361 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2362 evaluation, client_list, state,
2363 channel_info->uid, channel_info->gid);
2364 if (ret) {
2365 goto end_unlock;
2366 }
2367 }
2368end_unlock:
2369 rcu_read_unlock();
2370end:
2371 return ret;
2372}
This page took 0.126303 seconds and 5 git commands to generate.