2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
44 struct lttng_consumer_global_data consumer_data
= {
47 .type
= LTTNG_CONSUMER_UNKNOWN
,
50 /* timeout parameter, to control the polling thread grace period. */
51 int consumer_poll_timeout
= -1;
54 * Flag to inform the polling thread to quit when all fd hung up. Updated by
55 * the consumer_thread_receive_fds when it notices that all fds has hung up.
56 * Also updated by the signal handler (consumer_should_exit()). Read by the
59 volatile int consumer_quit
= 0;
62 * Find a stream. The consumer_data.lock must be locked during this
65 static struct lttng_consumer_stream
*consumer_find_stream(int key
)
67 struct lttng_ht_iter iter
;
68 struct lttng_ht_node_ulong
*node
;
69 struct lttng_consumer_stream
*stream
= NULL
;
71 /* Negative keys are lookup failures */
77 lttng_ht_lookup(consumer_data
.stream_ht
, (void *)((unsigned long) key
),
79 node
= lttng_ht_iter_get_node_ulong(&iter
);
81 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
89 static void consumer_steal_stream_key(int key
)
91 struct lttng_consumer_stream
*stream
;
94 stream
= consumer_find_stream(key
);
98 * We don't want the lookup to match, but we still need
99 * to iterate on this stream when iterating over the hash table. Just
100 * change the node key.
102 stream
->node
.key
= -1;
107 static struct lttng_consumer_channel
*consumer_find_channel(int key
)
109 struct lttng_ht_iter iter
;
110 struct lttng_ht_node_ulong
*node
;
111 struct lttng_consumer_channel
*channel
= NULL
;
113 /* Negative keys are lookup failures */
119 lttng_ht_lookup(consumer_data
.channel_ht
, (void *)((unsigned long) key
),
121 node
= lttng_ht_iter_get_node_ulong(&iter
);
123 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
131 static void consumer_steal_channel_key(int key
)
133 struct lttng_consumer_channel
*channel
;
136 channel
= consumer_find_channel(key
);
140 * We don't want the lookup to match, but we still need
141 * to iterate on this channel when iterating over the hash table. Just
142 * change the node key.
144 channel
->node
.key
= -1;
150 void consumer_free_stream(struct rcu_head
*head
)
152 struct lttng_ht_node_ulong
*node
=
153 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
154 struct lttng_consumer_stream
*stream
=
155 caa_container_of(node
, struct lttng_consumer_stream
, node
);
161 * RCU protected relayd socket pair free.
163 static void consumer_rcu_free_relayd(struct rcu_head
*head
)
165 struct lttng_ht_node_ulong
*node
=
166 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
167 struct consumer_relayd_sock_pair
*relayd
=
168 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
174 * Destroy and free relayd socket pair object.
176 * This function MUST be called with the consumer_data lock acquired.
178 void consumer_destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
181 struct lttng_ht_iter iter
;
183 if (relayd
== NULL
) {
187 DBG("Consumer destroy and close relayd socket pair");
189 iter
.iter
.node
= &relayd
->node
.node
;
190 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
192 /* We assume the relayd was already destroyed */
196 /* Close all sockets */
197 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
198 (void) relayd_close(&relayd
->control_sock
);
199 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
200 (void) relayd_close(&relayd
->data_sock
);
202 /* RCU free() call */
203 call_rcu(&relayd
->node
.head
, consumer_rcu_free_relayd
);
207 * Flag a relayd socket pair for destruction. Destroy it if the refcount
210 * RCU read side lock MUST be aquired before calling this function.
212 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair
*relayd
)
216 /* Set destroy flag for this object */
217 uatomic_set(&relayd
->destroy_flag
, 1);
219 /* Destroy the relayd if refcount is 0 */
220 if (uatomic_read(&relayd
->refcount
) == 0) {
221 consumer_destroy_relayd(relayd
);
226 * Remove a stream from the global list protected by a mutex. This
227 * function is also responsible for freeing its data structures.
229 void consumer_del_stream(struct lttng_consumer_stream
*stream
)
232 struct lttng_ht_iter iter
;
233 struct lttng_consumer_channel
*free_chan
= NULL
;
234 struct consumer_relayd_sock_pair
*relayd
;
238 pthread_mutex_lock(&consumer_data
.lock
);
240 switch (consumer_data
.type
) {
241 case LTTNG_CONSUMER_KERNEL
:
242 if (stream
->mmap_base
!= NULL
) {
243 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
249 case LTTNG_CONSUMER32_UST
:
250 case LTTNG_CONSUMER64_UST
:
251 lttng_ustconsumer_del_stream(stream
);
254 ERR("Unknown consumer_data type");
260 iter
.iter
.node
= &stream
->node
.node
;
261 ret
= lttng_ht_del(consumer_data
.stream_ht
, &iter
);
266 if (consumer_data
.stream_count
<= 0) {
269 consumer_data
.stream_count
--;
273 if (stream
->out_fd
>= 0) {
274 ret
= close(stream
->out_fd
);
279 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
280 ret
= close(stream
->wait_fd
);
285 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
286 ret
= close(stream
->shm_fd
);
292 /* Check and cleanup relayd */
294 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
295 if (relayd
!= NULL
) {
296 uatomic_dec(&relayd
->refcount
);
297 assert(uatomic_read(&relayd
->refcount
) >= 0);
299 /* Closing streams requires to lock the control socket. */
300 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
301 ret
= relayd_send_close_stream(&relayd
->control_sock
,
302 stream
->relayd_stream_id
,
303 stream
->next_net_seq_num
- 1);
304 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
306 DBG("Unable to close stream on the relayd. Continuing");
308 * Continue here. There is nothing we can do for the relayd.
309 * Chances are that the relayd has closed the socket so we just
310 * continue cleaning up.
314 /* Both conditions are met, we destroy the relayd. */
315 if (uatomic_read(&relayd
->refcount
) == 0 &&
316 uatomic_read(&relayd
->destroy_flag
)) {
317 consumer_destroy_relayd(relayd
);
322 if (!--stream
->chan
->refcount
) {
323 free_chan
= stream
->chan
;
327 call_rcu(&stream
->node
.head
, consumer_free_stream
);
329 consumer_data
.need_update
= 1;
330 pthread_mutex_unlock(&consumer_data
.lock
);
333 consumer_del_channel(free_chan
);
336 struct lttng_consumer_stream
*consumer_allocate_stream(
337 int channel_key
, int stream_key
,
338 int shm_fd
, int wait_fd
,
339 enum lttng_consumer_stream_state state
,
341 enum lttng_event_output output
,
342 const char *path_name
,
348 struct lttng_consumer_stream
*stream
;
351 stream
= zmalloc(sizeof(*stream
));
352 if (stream
== NULL
) {
353 perror("malloc struct lttng_consumer_stream");
356 stream
->chan
= consumer_find_channel(channel_key
);
358 perror("Unable to find channel key");
361 stream
->chan
->refcount
++;
362 stream
->key
= stream_key
;
363 stream
->shm_fd
= shm_fd
;
364 stream
->wait_fd
= wait_fd
;
366 stream
->out_fd_offset
= 0;
367 stream
->state
= state
;
368 stream
->mmap_len
= mmap_len
;
369 stream
->mmap_base
= NULL
;
370 stream
->output
= output
;
373 stream
->net_seq_idx
= net_index
;
374 stream
->metadata_flag
= metadata_flag
;
375 strncpy(stream
->path_name
, path_name
, sizeof(stream
->path_name
));
376 stream
->path_name
[sizeof(stream
->path_name
) - 1] = '\0';
377 lttng_ht_node_init_ulong(&stream
->node
, stream
->key
);
378 lttng_ht_node_init_ulong(&stream
->waitfd_node
, stream
->wait_fd
);
380 switch (consumer_data
.type
) {
381 case LTTNG_CONSUMER_KERNEL
:
383 case LTTNG_CONSUMER32_UST
:
384 case LTTNG_CONSUMER64_UST
:
385 stream
->cpu
= stream
->chan
->cpucount
++;
386 ret
= lttng_ustconsumer_allocate_stream(stream
);
393 ERR("Unknown consumer_data type");
397 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
398 stream
->path_name
, stream
->key
,
401 (unsigned long long) stream
->mmap_len
,
403 stream
->net_seq_idx
);
409 * Add a stream to the global list protected by a mutex.
411 int consumer_add_stream(struct lttng_consumer_stream
*stream
)
414 struct lttng_ht_node_ulong
*node
;
415 struct lttng_ht_iter iter
;
416 struct consumer_relayd_sock_pair
*relayd
;
418 pthread_mutex_lock(&consumer_data
.lock
);
419 /* Steal stream identifier, for UST */
420 consumer_steal_stream_key(stream
->key
);
423 lttng_ht_lookup(consumer_data
.stream_ht
,
424 (void *)((unsigned long) stream
->key
), &iter
);
425 node
= lttng_ht_iter_get_node_ulong(&iter
);
428 /* Stream already exist. Ignore the insertion */
432 lttng_ht_add_unique_ulong(consumer_data
.stream_ht
, &stream
->node
);
434 /* Check and cleanup relayd */
435 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
436 if (relayd
!= NULL
) {
437 uatomic_inc(&relayd
->refcount
);
441 /* Update consumer data */
442 consumer_data
.stream_count
++;
443 consumer_data
.need_update
= 1;
446 pthread_mutex_unlock(&consumer_data
.lock
);
452 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
453 * be acquired before calling this.
456 int consumer_add_relayd(struct consumer_relayd_sock_pair
*relayd
)
459 struct lttng_ht_node_ulong
*node
;
460 struct lttng_ht_iter iter
;
462 if (relayd
== NULL
) {
467 lttng_ht_lookup(consumer_data
.relayd_ht
,
468 (void *)((unsigned long) relayd
->net_seq_idx
), &iter
);
469 node
= lttng_ht_iter_get_node_ulong(&iter
);
471 /* Relayd already exist. Ignore the insertion */
474 lttng_ht_add_unique_ulong(consumer_data
.relayd_ht
, &relayd
->node
);
481 * Allocate and return a consumer relayd socket.
483 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
486 struct consumer_relayd_sock_pair
*obj
= NULL
;
488 /* Negative net sequence index is a failure */
489 if (net_seq_idx
< 0) {
493 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
495 PERROR("zmalloc relayd sock");
499 obj
->net_seq_idx
= net_seq_idx
;
501 obj
->destroy_flag
= 0;
502 lttng_ht_node_init_ulong(&obj
->node
, obj
->net_seq_idx
);
503 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
510 * Find a relayd socket pair in the global consumer data.
512 * Return the object if found else NULL.
513 * RCU read-side lock must be held across this call and while using the
516 struct consumer_relayd_sock_pair
*consumer_find_relayd(int key
)
518 struct lttng_ht_iter iter
;
519 struct lttng_ht_node_ulong
*node
;
520 struct consumer_relayd_sock_pair
*relayd
= NULL
;
522 /* Negative keys are lookup failures */
527 lttng_ht_lookup(consumer_data
.relayd_ht
, (void *)((unsigned long) key
),
529 node
= lttng_ht_iter_get_node_ulong(&iter
);
531 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
539 * Handle stream for relayd transmission if the stream applies for network
540 * streaming where the net sequence index is set.
542 * Return destination file descriptor or negative value on error.
544 static int write_relayd_stream_header(struct lttng_consumer_stream
*stream
,
545 size_t data_size
, struct consumer_relayd_sock_pair
*relayd
)
548 struct lttcomm_relayd_data_hdr data_hdr
;
554 /* Reset data header */
555 memset(&data_hdr
, 0, sizeof(data_hdr
));
557 if (stream
->metadata_flag
) {
558 /* Caller MUST acquire the relayd control socket lock */
559 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
564 /* Metadata are always sent on the control socket. */
565 outfd
= relayd
->control_sock
.fd
;
567 /* Set header with stream information */
568 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
569 data_hdr
.data_size
= htobe32(data_size
);
570 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
++);
571 /* Other fields are zeroed previously */
573 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
579 /* Set to go on data socket */
580 outfd
= relayd
->data_sock
.fd
;
588 * Update a stream according to what we just received.
590 void consumer_change_stream_state(int stream_key
,
591 enum lttng_consumer_stream_state state
)
593 struct lttng_consumer_stream
*stream
;
595 pthread_mutex_lock(&consumer_data
.lock
);
596 stream
= consumer_find_stream(stream_key
);
598 stream
->state
= state
;
600 consumer_data
.need_update
= 1;
601 pthread_mutex_unlock(&consumer_data
.lock
);
605 void consumer_free_channel(struct rcu_head
*head
)
607 struct lttng_ht_node_ulong
*node
=
608 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
609 struct lttng_consumer_channel
*channel
=
610 caa_container_of(node
, struct lttng_consumer_channel
, node
);
616 * Remove a channel from the global list protected by a mutex. This
617 * function is also responsible for freeing its data structures.
619 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
622 struct lttng_ht_iter iter
;
624 pthread_mutex_lock(&consumer_data
.lock
);
626 switch (consumer_data
.type
) {
627 case LTTNG_CONSUMER_KERNEL
:
629 case LTTNG_CONSUMER32_UST
:
630 case LTTNG_CONSUMER64_UST
:
631 lttng_ustconsumer_del_channel(channel
);
634 ERR("Unknown consumer_data type");
640 iter
.iter
.node
= &channel
->node
.node
;
641 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
645 if (channel
->mmap_base
!= NULL
) {
646 ret
= munmap(channel
->mmap_base
, channel
->mmap_len
);
651 if (channel
->wait_fd
>= 0 && !channel
->wait_fd_is_copy
) {
652 ret
= close(channel
->wait_fd
);
657 if (channel
->shm_fd
>= 0 && channel
->wait_fd
!= channel
->shm_fd
) {
658 ret
= close(channel
->shm_fd
);
664 call_rcu(&channel
->node
.head
, consumer_free_channel
);
666 pthread_mutex_unlock(&consumer_data
.lock
);
669 struct lttng_consumer_channel
*consumer_allocate_channel(
671 int shm_fd
, int wait_fd
,
673 uint64_t max_sb_size
)
675 struct lttng_consumer_channel
*channel
;
678 channel
= zmalloc(sizeof(*channel
));
679 if (channel
== NULL
) {
680 perror("malloc struct lttng_consumer_channel");
683 channel
->key
= channel_key
;
684 channel
->shm_fd
= shm_fd
;
685 channel
->wait_fd
= wait_fd
;
686 channel
->mmap_len
= mmap_len
;
687 channel
->max_sb_size
= max_sb_size
;
688 channel
->refcount
= 0;
689 lttng_ht_node_init_ulong(&channel
->node
, channel
->key
);
691 switch (consumer_data
.type
) {
692 case LTTNG_CONSUMER_KERNEL
:
693 channel
->mmap_base
= NULL
;
694 channel
->mmap_len
= 0;
696 case LTTNG_CONSUMER32_UST
:
697 case LTTNG_CONSUMER64_UST
:
698 ret
= lttng_ustconsumer_allocate_channel(channel
);
705 ERR("Unknown consumer_data type");
709 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
710 channel
->key
, channel
->shm_fd
, channel
->wait_fd
,
711 (unsigned long long) channel
->mmap_len
,
712 (unsigned long long) channel
->max_sb_size
);
718 * Add a channel to the global list protected by a mutex.
720 int consumer_add_channel(struct lttng_consumer_channel
*channel
)
722 struct lttng_ht_node_ulong
*node
;
723 struct lttng_ht_iter iter
;
725 pthread_mutex_lock(&consumer_data
.lock
);
726 /* Steal channel identifier, for UST */
727 consumer_steal_channel_key(channel
->key
);
730 lttng_ht_lookup(consumer_data
.channel_ht
,
731 (void *)((unsigned long) channel
->key
), &iter
);
732 node
= lttng_ht_iter_get_node_ulong(&iter
);
734 /* Channel already exist. Ignore the insertion */
738 lttng_ht_add_unique_ulong(consumer_data
.channel_ht
, &channel
->node
);
742 pthread_mutex_unlock(&consumer_data
.lock
);
748 * Allocate the pollfd structure and the local view of the out fds to avoid
749 * doing a lookup in the linked list and concurrency issues when writing is
750 * needed. Called with consumer_data.lock held.
752 * Returns the number of fds in the structures.
754 int consumer_update_poll_array(
755 struct lttng_consumer_local_data
*ctx
, struct pollfd
**pollfd
,
756 struct lttng_consumer_stream
**local_stream
)
759 struct lttng_ht_iter iter
;
760 struct lttng_consumer_stream
*stream
;
762 DBG("Updating poll fd array");
764 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, stream
,
766 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
) {
769 DBG("Active FD %d", stream
->wait_fd
);
770 (*pollfd
)[i
].fd
= stream
->wait_fd
;
771 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
772 local_stream
[i
] = stream
;
778 * Insert the consumer_poll_pipe at the end of the array and don't
779 * increment i so nb_fd is the number of real FD.
781 (*pollfd
)[i
].fd
= ctx
->consumer_poll_pipe
[0];
782 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
787 * Poll on the should_quit pipe and the command socket return -1 on error and
788 * should exit, 0 if data is available on the command socket
790 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
795 num_rdy
= poll(consumer_sockpoll
, 2, -1);
798 * Restart interrupted system call.
800 if (errno
== EINTR
) {
803 perror("Poll error");
806 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
807 DBG("consumer_should_quit wake up");
817 * Set the error socket.
819 void lttng_consumer_set_error_sock(
820 struct lttng_consumer_local_data
*ctx
, int sock
)
822 ctx
->consumer_error_socket
= sock
;
826 * Set the command socket path.
828 void lttng_consumer_set_command_sock_path(
829 struct lttng_consumer_local_data
*ctx
, char *sock
)
831 ctx
->consumer_command_sock_path
= sock
;
835 * Send return code to the session daemon.
836 * If the socket is not defined, we return 0, it is not a fatal error
838 int lttng_consumer_send_error(
839 struct lttng_consumer_local_data
*ctx
, int cmd
)
841 if (ctx
->consumer_error_socket
> 0) {
842 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
843 sizeof(enum lttcomm_sessiond_command
));
850 * Close all the tracefiles and stream fds, should be called when all instances
853 void lttng_consumer_cleanup(void)
855 struct lttng_ht_iter iter
;
856 struct lttng_ht_node_ulong
*node
;
861 * close all outfd. Called when there are no more threads running (after
862 * joining on the threads), no need to protect list iteration with mutex.
864 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, node
,
866 struct lttng_consumer_stream
*stream
=
867 caa_container_of(node
, struct lttng_consumer_stream
, node
);
868 consumer_del_stream(stream
);
871 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, node
,
873 struct lttng_consumer_channel
*channel
=
874 caa_container_of(node
, struct lttng_consumer_channel
, node
);
875 consumer_del_channel(channel
);
880 lttng_ht_destroy(consumer_data
.stream_ht
);
881 lttng_ht_destroy(consumer_data
.channel_ht
);
885 * Called from signal handler.
887 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
892 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
893 } while (ret
< 0 && errno
== EINTR
);
895 perror("write consumer quit");
899 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
902 int outfd
= stream
->out_fd
;
905 * This does a blocking write-and-wait on any page that belongs to the
906 * subbuffer prior to the one we just wrote.
907 * Don't care about error values, as these are just hints and ways to
908 * limit the amount of page cache used.
910 if (orig_offset
< stream
->chan
->max_sb_size
) {
913 lttng_sync_file_range(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
914 stream
->chan
->max_sb_size
,
915 SYNC_FILE_RANGE_WAIT_BEFORE
916 | SYNC_FILE_RANGE_WRITE
917 | SYNC_FILE_RANGE_WAIT_AFTER
);
919 * Give hints to the kernel about how we access the file:
920 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
923 * We need to call fadvise again after the file grows because the
924 * kernel does not seem to apply fadvise to non-existing parts of the
927 * Call fadvise _after_ having waited for the page writeback to
928 * complete because the dirty page writeback semantic is not well
929 * defined. So it can be expected to lead to lower throughput in
932 posix_fadvise(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
933 stream
->chan
->max_sb_size
, POSIX_FADV_DONTNEED
);
937 * Initialise the necessary environnement :
938 * - create a new context
939 * - create the poll_pipe
940 * - create the should_quit pipe (for signal handler)
941 * - create the thread pipe (for splice)
943 * Takes a function pointer as argument, this function is called when data is
944 * available on a buffer. This function is responsible to do the
945 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
946 * buffer configuration and then kernctl_put_next_subbuf at the end.
948 * Returns a pointer to the new context or NULL on error.
950 struct lttng_consumer_local_data
*lttng_consumer_create(
951 enum lttng_consumer_type type
,
952 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
953 struct lttng_consumer_local_data
*ctx
),
954 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
955 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
956 int (*update_stream
)(int stream_key
, uint32_t state
))
959 struct lttng_consumer_local_data
*ctx
;
961 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
962 consumer_data
.type
== type
);
963 consumer_data
.type
= type
;
965 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
967 perror("allocating context");
971 ctx
->consumer_error_socket
= -1;
972 /* assign the callbacks */
973 ctx
->on_buffer_ready
= buffer_ready
;
974 ctx
->on_recv_channel
= recv_channel
;
975 ctx
->on_recv_stream
= recv_stream
;
976 ctx
->on_update_stream
= update_stream
;
978 ret
= pipe(ctx
->consumer_poll_pipe
);
980 perror("Error creating poll pipe");
981 goto error_poll_pipe
;
984 /* set read end of the pipe to non-blocking */
985 ret
= fcntl(ctx
->consumer_poll_pipe
[0], F_SETFL
, O_NONBLOCK
);
987 perror("fcntl O_NONBLOCK");
988 goto error_poll_fcntl
;
991 /* set write end of the pipe to non-blocking */
992 ret
= fcntl(ctx
->consumer_poll_pipe
[1], F_SETFL
, O_NONBLOCK
);
994 perror("fcntl O_NONBLOCK");
995 goto error_poll_fcntl
;
998 ret
= pipe(ctx
->consumer_should_quit
);
1000 perror("Error creating recv pipe");
1001 goto error_quit_pipe
;
1004 ret
= pipe(ctx
->consumer_thread_pipe
);
1006 perror("Error creating thread pipe");
1007 goto error_thread_pipe
;
1010 ret
= utils_create_pipe(ctx
->consumer_metadata_pipe
);
1012 goto error_metadata_pipe
;
1015 ret
= utils_create_pipe(ctx
->consumer_splice_metadata_pipe
);
1017 goto error_splice_pipe
;
1023 utils_close_pipe(ctx
->consumer_metadata_pipe
);
1024 error_metadata_pipe
:
1025 utils_close_pipe(ctx
->consumer_thread_pipe
);
1027 for (i
= 0; i
< 2; i
++) {
1030 err
= close(ctx
->consumer_should_quit
[i
]);
1037 for (i
= 0; i
< 2; i
++) {
1040 err
= close(ctx
->consumer_poll_pipe
[i
]);
1052 * Close all fds associated with the instance and free the context.
1054 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1058 ret
= close(ctx
->consumer_error_socket
);
1062 ret
= close(ctx
->consumer_thread_pipe
[0]);
1066 ret
= close(ctx
->consumer_thread_pipe
[1]);
1070 ret
= close(ctx
->consumer_poll_pipe
[0]);
1074 ret
= close(ctx
->consumer_poll_pipe
[1]);
1078 ret
= close(ctx
->consumer_should_quit
[0]);
1082 ret
= close(ctx
->consumer_should_quit
[1]);
1086 utils_close_pipe(ctx
->consumer_splice_metadata_pipe
);
1088 unlink(ctx
->consumer_command_sock_path
);
1093 * Write the metadata stream id on the specified file descriptor.
1095 static int write_relayd_metadata_id(int fd
,
1096 struct lttng_consumer_stream
*stream
,
1097 struct consumer_relayd_sock_pair
*relayd
)
1100 uint64_t metadata_id
;
1102 metadata_id
= htobe64(stream
->relayd_stream_id
);
1104 ret
= write(fd
, (void *) &metadata_id
,
1105 sizeof(stream
->relayd_stream_id
));
1106 } while (ret
< 0 && errno
== EINTR
);
1108 PERROR("write metadata stream id");
1111 DBG("Metadata stream id %" PRIu64
" written before data",
1112 stream
->relayd_stream_id
);
1119 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1120 * core function for writing trace buffers to either the local filesystem or
1123 * Careful review MUST be put if any changes occur!
1125 * Returns the number of bytes written
1127 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1128 struct lttng_consumer_local_data
*ctx
,
1129 struct lttng_consumer_stream
*stream
, unsigned long len
)
1131 unsigned long mmap_offset
;
1132 ssize_t ret
= 0, written
= 0;
1133 off_t orig_offset
= stream
->out_fd_offset
;
1134 /* Default is on the disk */
1135 int outfd
= stream
->out_fd
;
1136 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1138 /* RCU lock for the relayd pointer */
1141 /* Flag that the current stream if set for network streaming. */
1142 if (stream
->net_seq_idx
!= -1) {
1143 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1144 if (relayd
== NULL
) {
1149 /* get the offset inside the fd to mmap */
1150 switch (consumer_data
.type
) {
1151 case LTTNG_CONSUMER_KERNEL
:
1152 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1154 case LTTNG_CONSUMER32_UST
:
1155 case LTTNG_CONSUMER64_UST
:
1156 ret
= lttng_ustctl_get_mmap_read_offset(stream
->chan
->handle
,
1157 stream
->buf
, &mmap_offset
);
1160 ERR("Unknown consumer_data type");
1165 PERROR("tracer ctl get_mmap_read_offset");
1170 /* Handle stream on the relayd if the output is on the network */
1172 unsigned long netlen
= len
;
1175 * Lock the control socket for the complete duration of the function
1176 * since from this point on we will use the socket.
1178 if (stream
->metadata_flag
) {
1179 /* Metadata requires the control socket. */
1180 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1181 netlen
+= sizeof(stream
->relayd_stream_id
);
1184 ret
= write_relayd_stream_header(stream
, netlen
, relayd
);
1186 /* Use the returned socket. */
1189 /* Write metadata stream id before payload */
1190 if (stream
->metadata_flag
) {
1191 ret
= write_relayd_metadata_id(outfd
, stream
, relayd
);
1198 /* Else, use the default set before which is the filesystem. */
1203 ret
= write(outfd
, stream
->mmap_base
+ mmap_offset
, len
);
1204 } while (ret
< 0 && errno
== EINTR
);
1206 PERROR("Error in file write");
1211 } else if (ret
> len
) {
1212 PERROR("Error in file write (ret %zd > len %lu)", ret
, len
);
1219 DBG("Consumer mmap write() ret %zd (len %lu)", ret
, len
);
1221 /* This call is useless on a socket so better save a syscall. */
1223 /* This won't block, but will start writeout asynchronously */
1224 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret
,
1225 SYNC_FILE_RANGE_WRITE
);
1226 stream
->out_fd_offset
+= ret
;
1230 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1233 /* Unlock only if ctrl socket used */
1234 if (relayd
&& stream
->metadata_flag
) {
1235 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1243 * Splice the data from the ring buffer to the tracefile.
1245 * Returns the number of bytes spliced.
1247 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1248 struct lttng_consumer_local_data
*ctx
,
1249 struct lttng_consumer_stream
*stream
, unsigned long len
)
1251 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1253 off_t orig_offset
= stream
->out_fd_offset
;
1254 int fd
= stream
->wait_fd
;
1255 /* Default is on the disk */
1256 int outfd
= stream
->out_fd
;
1257 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1260 switch (consumer_data
.type
) {
1261 case LTTNG_CONSUMER_KERNEL
:
1263 case LTTNG_CONSUMER32_UST
:
1264 case LTTNG_CONSUMER64_UST
:
1265 /* Not supported for user space tracing */
1268 ERR("Unknown consumer_data type");
1272 /* RCU lock for the relayd pointer */
1275 /* Flag that the current stream if set for network streaming. */
1276 if (stream
->net_seq_idx
!= -1) {
1277 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1278 if (relayd
== NULL
) {
1284 * Choose right pipe for splice. Metadata and trace data are handled by
1285 * different threads hence the use of two pipes in order not to race or
1286 * corrupt the written data.
1288 if (stream
->metadata_flag
) {
1289 splice_pipe
= ctx
->consumer_splice_metadata_pipe
;
1291 splice_pipe
= ctx
->consumer_thread_pipe
;
1294 /* Write metadata stream id before payload */
1295 if (stream
->metadata_flag
&& relayd
) {
1297 * Lock the control socket for the complete duration of the function
1298 * since from this point on we will use the socket.
1300 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1302 ret
= write_relayd_metadata_id(splice_pipe
[1], stream
, relayd
);
1310 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
1311 (unsigned long)offset
, len
, fd
);
1312 ret_splice
= splice(fd
, &offset
, splice_pipe
[1], NULL
, len
,
1313 SPLICE_F_MOVE
| SPLICE_F_MORE
);
1314 DBG("splice chan to pipe, ret %zd", ret_splice
);
1315 if (ret_splice
< 0) {
1316 PERROR("Error in relay splice");
1318 written
= ret_splice
;
1324 /* Handle stream on the relayd if the output is on the network */
1326 if (stream
->metadata_flag
) {
1327 /* Update counter to fit the spliced data */
1328 ret_splice
+= sizeof(stream
->relayd_stream_id
);
1329 len
+= sizeof(stream
->relayd_stream_id
);
1331 * We do this so the return value can match the len passed as
1332 * argument to this function.
1334 written
-= sizeof(stream
->relayd_stream_id
);
1337 ret
= write_relayd_stream_header(stream
, ret_splice
, relayd
);
1339 /* Use the returned socket. */
1342 ERR("Remote relayd disconnected. Stopping");
1347 /* Splice data out */
1348 ret_splice
= splice(splice_pipe
[0], NULL
, outfd
, NULL
,
1349 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
1350 DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice
);
1351 if (ret_splice
< 0) {
1352 PERROR("Error in file splice");
1354 written
= ret_splice
;
1358 } else if (ret_splice
> len
) {
1360 PERROR("Wrote more data than requested %zd (len: %lu)",
1362 written
+= ret_splice
;
1368 /* This call is useless on a socket so better save a syscall. */
1370 /* This won't block, but will start writeout asynchronously */
1371 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
1372 SYNC_FILE_RANGE_WRITE
);
1373 stream
->out_fd_offset
+= ret_splice
;
1375 written
+= ret_splice
;
1377 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1384 /* send the appropriate error description to sessiond */
1387 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_EBADF
);
1390 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_EINVAL
);
1393 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ENOMEM
);
1396 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ESPIPE
);
1401 if (relayd
&& stream
->metadata_flag
) {
1402 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1410 * Take a snapshot for a specific fd
1412 * Returns 0 on success, < 0 on error
1414 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data
*ctx
,
1415 struct lttng_consumer_stream
*stream
)
1417 switch (consumer_data
.type
) {
1418 case LTTNG_CONSUMER_KERNEL
:
1419 return lttng_kconsumer_take_snapshot(ctx
, stream
);
1420 case LTTNG_CONSUMER32_UST
:
1421 case LTTNG_CONSUMER64_UST
:
1422 return lttng_ustconsumer_take_snapshot(ctx
, stream
);
1424 ERR("Unknown consumer_data type");
1432 * Get the produced position
1434 * Returns 0 on success, < 0 on error
1436 int lttng_consumer_get_produced_snapshot(
1437 struct lttng_consumer_local_data
*ctx
,
1438 struct lttng_consumer_stream
*stream
,
1441 switch (consumer_data
.type
) {
1442 case LTTNG_CONSUMER_KERNEL
:
1443 return lttng_kconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1444 case LTTNG_CONSUMER32_UST
:
1445 case LTTNG_CONSUMER64_UST
:
1446 return lttng_ustconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1448 ERR("Unknown consumer_data type");
1454 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1455 int sock
, struct pollfd
*consumer_sockpoll
)
1457 switch (consumer_data
.type
) {
1458 case LTTNG_CONSUMER_KERNEL
:
1459 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1460 case LTTNG_CONSUMER32_UST
:
1461 case LTTNG_CONSUMER64_UST
:
1462 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1464 ERR("Unknown consumer_data type");
1471 * Iterate over all stream element of the hashtable and free them. This is race
1472 * free since the hashtable received MUST be in a race free synchronization
1473 * state. It's the caller responsability to make sure of that.
1475 static void destroy_stream_ht(struct lttng_ht
*ht
)
1478 struct lttng_ht_iter iter
;
1479 struct lttng_consumer_stream
*stream
;
1485 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1486 ret
= lttng_ht_del(ht
, &iter
);
1492 lttng_ht_destroy(ht
);
1496 * Clean up a metadata stream and free its memory.
1498 static void consumer_del_metadata_stream(struct lttng_consumer_stream
*stream
)
1501 struct lttng_consumer_channel
*free_chan
= NULL
;
1502 struct consumer_relayd_sock_pair
*relayd
;
1506 * This call should NEVER receive regular stream. It must always be
1507 * metadata stream and this is crucial for data structure synchronization.
1509 assert(stream
->metadata_flag
);
1511 pthread_mutex_lock(&consumer_data
.lock
);
1512 switch (consumer_data
.type
) {
1513 case LTTNG_CONSUMER_KERNEL
:
1514 if (stream
->mmap_base
!= NULL
) {
1515 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
1517 PERROR("munmap metadata stream");
1521 case LTTNG_CONSUMER32_UST
:
1522 case LTTNG_CONSUMER64_UST
:
1523 lttng_ustconsumer_del_stream(stream
);
1526 ERR("Unknown consumer_data type");
1529 pthread_mutex_unlock(&consumer_data
.lock
);
1531 if (stream
->out_fd
>= 0) {
1532 ret
= close(stream
->out_fd
);
1538 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
1539 ret
= close(stream
->wait_fd
);
1545 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
1546 ret
= close(stream
->shm_fd
);
1552 /* Check and cleanup relayd */
1554 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1555 if (relayd
!= NULL
) {
1556 uatomic_dec(&relayd
->refcount
);
1557 assert(uatomic_read(&relayd
->refcount
) >= 0);
1559 /* Closing streams requires to lock the control socket. */
1560 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1561 ret
= relayd_send_close_stream(&relayd
->control_sock
,
1562 stream
->relayd_stream_id
, stream
->next_net_seq_num
- 1);
1563 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1565 DBG("Unable to close stream on the relayd. Continuing");
1567 * Continue here. There is nothing we can do for the relayd.
1568 * Chances are that the relayd has closed the socket so we just
1569 * continue cleaning up.
1573 /* Both conditions are met, we destroy the relayd. */
1574 if (uatomic_read(&relayd
->refcount
) == 0 &&
1575 uatomic_read(&relayd
->destroy_flag
)) {
1576 consumer_destroy_relayd(relayd
);
1581 /* Atomically decrement channel refcount since other threads can use it. */
1582 uatomic_dec(&stream
->chan
->refcount
);
1583 if (!uatomic_read(&stream
->chan
->refcount
)) {
1584 free_chan
= stream
->chan
;
1588 consumer_del_channel(free_chan
);
1595 * Action done with the metadata stream when adding it to the consumer internal
1596 * data structures to handle it.
1598 static void consumer_add_metadata_stream(struct lttng_consumer_stream
*stream
)
1600 struct consumer_relayd_sock_pair
*relayd
;
1602 /* Find relayd and, if one is found, increment refcount. */
1604 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1605 if (relayd
!= NULL
) {
1606 uatomic_inc(&relayd
->refcount
);
1612 * Thread polls on metadata file descriptor and write them on disk or on the
1615 void *lttng_consumer_thread_poll_metadata(void *data
)
1618 uint32_t revents
, nb_fd
;
1619 struct lttng_consumer_stream
*stream
;
1620 struct lttng_ht_iter iter
;
1621 struct lttng_ht_node_ulong
*node
;
1622 struct lttng_ht
*metadata_ht
= NULL
;
1623 struct lttng_poll_event events
;
1624 struct lttng_consumer_local_data
*ctx
= data
;
1627 rcu_register_thread();
1629 DBG("Thread metadata poll started");
1631 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1632 if (metadata_ht
== NULL
) {
1636 /* Size is set to 1 for the consumer_metadata pipe */
1637 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
1639 ERR("Poll set creation failed");
1643 ret
= lttng_poll_add(&events
, ctx
->consumer_metadata_pipe
[0], LPOLLIN
);
1649 DBG("Metadata main loop started");
1652 lttng_poll_reset(&events
);
1654 nb_fd
= LTTNG_POLL_GETNB(&events
);
1656 /* Only the metadata pipe is set */
1657 if (nb_fd
== 0 && consumer_quit
== 1) {
1662 DBG("Metadata poll wait with %d fd(s)", nb_fd
);
1663 ret
= lttng_poll_wait(&events
, -1);
1664 DBG("Metadata event catched in thread");
1666 if (errno
== EINTR
) {
1672 for (i
= 0; i
< nb_fd
; i
++) {
1673 revents
= LTTNG_POLL_GETEV(&events
, i
);
1674 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
1676 /* Check the metadata pipe for incoming metadata. */
1677 if (pollfd
== ctx
->consumer_metadata_pipe
[0]) {
1678 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLNVAL
)) {
1679 DBG("Metadata thread pipe hung up");
1681 * Remove the pipe from the poll set and continue the loop
1682 * since their might be data to consume.
1684 lttng_poll_del(&events
, ctx
->consumer_metadata_pipe
[0]);
1685 close(ctx
->consumer_metadata_pipe
[0]);
1687 } else if (revents
& LPOLLIN
) {
1688 stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
1689 if (stream
== NULL
) {
1690 PERROR("zmalloc metadata consumer stream");
1695 /* Get the stream and add it to the local hash table */
1696 ret
= read(pollfd
, stream
,
1697 sizeof(struct lttng_consumer_stream
));
1698 } while (ret
< 0 && errno
== EINTR
);
1699 if (ret
< 0 || ret
< sizeof(struct lttng_consumer_stream
)) {
1700 PERROR("read metadata stream");
1703 * Let's continue here and hope we can still work
1704 * without stopping the consumer. XXX: Should we?
1709 DBG("Adding metadata stream %d to poll set",
1712 /* The node should be init at this point */
1713 lttng_ht_add_unique_ulong(metadata_ht
,
1714 &stream
->waitfd_node
);
1716 /* Add metadata stream to the global poll events list */
1717 lttng_poll_add(&events
, stream
->wait_fd
,
1718 LPOLLIN
| LPOLLPRI
);
1720 consumer_add_metadata_stream(stream
);
1723 /* Metadata pipe handled. Continue handling the others */
1727 /* From here, the event is a metadata wait fd */
1729 lttng_ht_lookup(metadata_ht
, (void *)((unsigned long) pollfd
),
1731 node
= lttng_ht_iter_get_node_ulong(&iter
);
1733 /* FD not found, continue loop */
1737 stream
= caa_container_of(node
, struct lttng_consumer_stream
,
1740 /* Get the data out of the metadata file descriptor */
1741 if (revents
& (LPOLLIN
| LPOLLPRI
)) {
1742 DBG("Metadata available on fd %d", pollfd
);
1743 assert(stream
->wait_fd
== pollfd
);
1745 len
= ctx
->on_buffer_ready(stream
, ctx
);
1746 /* It's ok to have an unavailable sub-buffer */
1747 if (len
< 0 && len
!= -EAGAIN
) {
1749 } else if (len
> 0) {
1750 stream
->data_read
= 1;
1755 * Remove the stream from the hash table since there is no data
1756 * left on the fd because we previously did a read on the buffer.
1758 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLNVAL
)) {
1759 DBG("Metadata fd %d is hup|err|nval.", pollfd
);
1760 if (!stream
->hangup_flush_done
1761 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
1762 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
1763 DBG("Attempting to flush and consume the UST buffers");
1764 lttng_ustconsumer_on_stream_hangup(stream
);
1766 /* We just flushed the stream now read it. */
1767 len
= ctx
->on_buffer_ready(stream
, ctx
);
1768 /* It's ok to have an unavailable sub-buffer */
1769 if (len
< 0 && len
!= -EAGAIN
) {
1774 /* Removing it from hash table, poll set and free memory */
1775 lttng_ht_del(metadata_ht
, &iter
);
1776 lttng_poll_del(&events
, stream
->wait_fd
);
1777 consumer_del_metadata_stream(stream
);
1784 DBG("Metadata poll thread exiting");
1785 lttng_poll_clean(&events
);
1788 destroy_stream_ht(metadata_ht
);
1791 rcu_unregister_thread();
1796 * This thread polls the fds in the set to consume the data and write
1797 * it to tracefile if necessary.
1799 void *lttng_consumer_thread_poll_fds(void *data
)
1801 int num_rdy
, num_hup
, high_prio
, ret
, i
;
1802 struct pollfd
*pollfd
= NULL
;
1803 /* local view of the streams */
1804 struct lttng_consumer_stream
**local_stream
= NULL
;
1805 /* local view of consumer_data.fds_count */
1807 struct lttng_consumer_local_data
*ctx
= data
;
1809 pthread_t metadata_thread
;
1812 rcu_register_thread();
1814 /* Start metadata polling thread */
1815 ret
= pthread_create(&metadata_thread
, NULL
,
1816 lttng_consumer_thread_poll_metadata
, (void *) ctx
);
1818 PERROR("pthread_create metadata thread");
1822 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
1829 * the fds set has been updated, we need to update our
1830 * local array as well
1832 pthread_mutex_lock(&consumer_data
.lock
);
1833 if (consumer_data
.need_update
) {
1834 if (pollfd
!= NULL
) {
1838 if (local_stream
!= NULL
) {
1840 local_stream
= NULL
;
1843 /* allocate for all fds + 1 for the consumer_poll_pipe */
1844 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
1845 if (pollfd
== NULL
) {
1846 perror("pollfd malloc");
1847 pthread_mutex_unlock(&consumer_data
.lock
);
1851 /* allocate for all fds + 1 for the consumer_poll_pipe */
1852 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
1853 sizeof(struct lttng_consumer_stream
));
1854 if (local_stream
== NULL
) {
1855 perror("local_stream malloc");
1856 pthread_mutex_unlock(&consumer_data
.lock
);
1859 ret
= consumer_update_poll_array(ctx
, &pollfd
, local_stream
);
1861 ERR("Error in allocating pollfd or local_outfds");
1862 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
1863 pthread_mutex_unlock(&consumer_data
.lock
);
1867 consumer_data
.need_update
= 0;
1869 pthread_mutex_unlock(&consumer_data
.lock
);
1871 /* No FDs and consumer_quit, consumer_cleanup the thread */
1872 if (nb_fd
== 0 && consumer_quit
== 1) {
1875 /* poll on the array of fds */
1877 DBG("polling on %d fd", nb_fd
+ 1);
1878 num_rdy
= poll(pollfd
, nb_fd
+ 1, consumer_poll_timeout
);
1879 DBG("poll num_rdy : %d", num_rdy
);
1880 if (num_rdy
== -1) {
1882 * Restart interrupted system call.
1884 if (errno
== EINTR
) {
1887 perror("Poll error");
1888 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
1890 } else if (num_rdy
== 0) {
1891 DBG("Polling thread timed out");
1896 * If the consumer_poll_pipe triggered poll go directly to the
1897 * beginning of the loop to update the array. We want to prioritize
1898 * array update over low-priority reads.
1900 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
1901 size_t pipe_readlen
;
1904 DBG("consumer_poll_pipe wake up");
1905 /* Consume 1 byte of pipe data */
1907 pipe_readlen
= read(ctx
->consumer_poll_pipe
[0], &tmp
, 1);
1908 } while (pipe_readlen
== -1 && errno
== EINTR
);
1912 /* Take care of high priority channels first. */
1913 for (i
= 0; i
< nb_fd
; i
++) {
1914 if (pollfd
[i
].revents
& POLLPRI
) {
1915 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
1917 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1918 /* it's ok to have an unavailable sub-buffer */
1919 if (len
< 0 && len
!= -EAGAIN
) {
1921 } else if (len
> 0) {
1922 local_stream
[i
]->data_read
= 1;
1928 * If we read high prio channel in this loop, try again
1929 * for more high prio data.
1935 /* Take care of low priority channels. */
1936 for (i
= 0; i
< nb_fd
; i
++) {
1937 if ((pollfd
[i
].revents
& POLLIN
) ||
1938 local_stream
[i
]->hangup_flush_done
) {
1939 DBG("Normal read on fd %d", pollfd
[i
].fd
);
1940 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1941 /* it's ok to have an unavailable sub-buffer */
1942 if (len
< 0 && len
!= -EAGAIN
) {
1944 } else if (len
> 0) {
1945 local_stream
[i
]->data_read
= 1;
1950 /* Handle hangup and errors */
1951 for (i
= 0; i
< nb_fd
; i
++) {
1952 if (!local_stream
[i
]->hangup_flush_done
1953 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
1954 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
1955 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
1956 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1958 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
1959 /* Attempt read again, for the data we just flushed. */
1960 local_stream
[i
]->data_read
= 1;
1963 * If the poll flag is HUP/ERR/NVAL and we have
1964 * read no data in this pass, we can remove the
1965 * stream from its hash table.
1967 if ((pollfd
[i
].revents
& POLLHUP
)) {
1968 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
1969 if (!local_stream
[i
]->data_read
) {
1970 consumer_del_stream(local_stream
[i
]);
1973 } else if (pollfd
[i
].revents
& POLLERR
) {
1974 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
1975 if (!local_stream
[i
]->data_read
) {
1976 consumer_del_stream(local_stream
[i
]);
1979 } else if (pollfd
[i
].revents
& POLLNVAL
) {
1980 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
1981 if (!local_stream
[i
]->data_read
) {
1982 consumer_del_stream(local_stream
[i
]);
1986 local_stream
[i
]->data_read
= 0;
1990 DBG("polling thread exiting");
1991 if (pollfd
!= NULL
) {
1995 if (local_stream
!= NULL
) {
1997 local_stream
= NULL
;
2001 * Close the write side of the pipe so epoll_wait() in
2002 * lttng_consumer_thread_poll_metadata can catch it. The thread is
2003 * monitoring the read side of the pipe. If we close them both, epoll_wait
2004 * strangely does not return and could create a endless wait period if the
2005 * pipe is the only tracked fd in the poll set. The thread will take care
2006 * of closing the read side.
2008 close(ctx
->consumer_metadata_pipe
[1]);
2010 ret
= pthread_join(metadata_thread
, &status
);
2012 PERROR("pthread_join metadata thread");
2016 rcu_unregister_thread();
2021 * This thread listens on the consumerd socket and receives the file
2022 * descriptors from the session daemon.
2024 void *lttng_consumer_thread_receive_fds(void *data
)
2026 int sock
, client_socket
, ret
;
2028 * structure to poll for incoming data on communication socket avoids
2029 * making blocking sockets.
2031 struct pollfd consumer_sockpoll
[2];
2032 struct lttng_consumer_local_data
*ctx
= data
;
2034 rcu_register_thread();
2036 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
2037 unlink(ctx
->consumer_command_sock_path
);
2038 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
2039 if (client_socket
< 0) {
2040 ERR("Cannot create command socket");
2044 ret
= lttcomm_listen_unix_sock(client_socket
);
2049 DBG("Sending ready command to lttng-sessiond");
2050 ret
= lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
);
2051 /* return < 0 on error, but == 0 is not fatal */
2053 ERR("Error sending ready command to lttng-sessiond");
2057 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
2059 perror("fcntl O_NONBLOCK");
2063 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2064 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
2065 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
2066 consumer_sockpoll
[1].fd
= client_socket
;
2067 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2069 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2072 DBG("Connection on client_socket");
2074 /* Blocking call, waiting for transmission */
2075 sock
= lttcomm_accept_unix_sock(client_socket
);
2080 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
2082 perror("fcntl O_NONBLOCK");
2086 /* update the polling structure to poll on the established socket */
2087 consumer_sockpoll
[1].fd
= sock
;
2088 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2091 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2094 DBG("Incoming command on sock");
2095 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
2096 if (ret
== -ENOENT
) {
2097 DBG("Received STOP command");
2102 * This could simply be a session daemon quitting. Don't output
2105 DBG("Communication interrupted on command socket");
2108 if (consumer_quit
) {
2109 DBG("consumer_thread_receive_fds received quit from signal");
2112 DBG("received fds on sock");
2115 DBG("consumer_thread_receive_fds exiting");
2118 * when all fds have hung up, the polling thread
2124 * 2s of grace period, if no polling events occur during
2125 * this period, the polling thread will exit even if there
2126 * are still open FDs (should not happen, but safety mechanism).
2128 consumer_poll_timeout
= LTTNG_CONSUMER_POLL_TIMEOUT
;
2131 * Wake-up the other end by writing a null byte in the pipe
2132 * (non-blocking). Important note: Because writing into the
2133 * pipe is non-blocking (and therefore we allow dropping wakeup
2134 * data, as long as there is wakeup data present in the pipe
2135 * buffer to wake up the other end), the other end should
2136 * perform the following sequence for waiting:
2137 * 1) empty the pipe (reads).
2138 * 2) perform update operation.
2139 * 3) wait on the pipe (poll).
2142 ret
= write(ctx
->consumer_poll_pipe
[1], "", 1);
2143 } while (ret
< 0 && errno
== EINTR
);
2144 rcu_unregister_thread();
2148 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
2149 struct lttng_consumer_local_data
*ctx
)
2151 switch (consumer_data
.type
) {
2152 case LTTNG_CONSUMER_KERNEL
:
2153 return lttng_kconsumer_read_subbuffer(stream
, ctx
);
2154 case LTTNG_CONSUMER32_UST
:
2155 case LTTNG_CONSUMER64_UST
:
2156 return lttng_ustconsumer_read_subbuffer(stream
, ctx
);
2158 ERR("Unknown consumer_data type");
2164 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
2166 switch (consumer_data
.type
) {
2167 case LTTNG_CONSUMER_KERNEL
:
2168 return lttng_kconsumer_on_recv_stream(stream
);
2169 case LTTNG_CONSUMER32_UST
:
2170 case LTTNG_CONSUMER64_UST
:
2171 return lttng_ustconsumer_on_recv_stream(stream
);
2173 ERR("Unknown consumer_data type");
2180 * Allocate and set consumer data hash tables.
2182 void lttng_consumer_init(void)
2184 consumer_data
.stream_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2185 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2186 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2190 * Process the ADD_RELAYD command receive by a consumer.
2192 * This will create a relayd socket pair and add it to the relayd hash table.
2193 * The caller MUST acquire a RCU read side lock before calling it.
2195 int consumer_add_relayd_socket(int net_seq_idx
, int sock_type
,
2196 struct lttng_consumer_local_data
*ctx
, int sock
,
2197 struct pollfd
*consumer_sockpoll
, struct lttcomm_sock
*relayd_sock
)
2200 struct consumer_relayd_sock_pair
*relayd
;
2202 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx
);
2204 /* Get relayd reference if exists. */
2205 relayd
= consumer_find_relayd(net_seq_idx
);
2206 if (relayd
== NULL
) {
2207 /* Not found. Allocate one. */
2208 relayd
= consumer_allocate_relayd_sock_pair(net_seq_idx
);
2209 if (relayd
== NULL
) {
2210 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
2215 /* Poll on consumer socket. */
2216 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2221 /* Get relayd socket from session daemon */
2222 ret
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
2223 if (ret
!= sizeof(fd
)) {
2224 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
2229 /* Copy socket information and received FD */
2230 switch (sock_type
) {
2231 case LTTNG_STREAM_CONTROL
:
2232 /* Copy received lttcomm socket */
2233 lttcomm_copy_sock(&relayd
->control_sock
, relayd_sock
);
2234 ret
= lttcomm_create_sock(&relayd
->control_sock
);
2239 /* Close the created socket fd which is useless */
2240 close(relayd
->control_sock
.fd
);
2242 /* Assign new file descriptor */
2243 relayd
->control_sock
.fd
= fd
;
2245 case LTTNG_STREAM_DATA
:
2246 /* Copy received lttcomm socket */
2247 lttcomm_copy_sock(&relayd
->data_sock
, relayd_sock
);
2248 ret
= lttcomm_create_sock(&relayd
->data_sock
);
2253 /* Close the created socket fd which is useless */
2254 close(relayd
->data_sock
.fd
);
2256 /* Assign new file descriptor */
2257 relayd
->data_sock
.fd
= fd
;
2260 ERR("Unknown relayd socket type (%d)", sock_type
);
2264 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2265 sock_type
== LTTNG_STREAM_CONTROL
? "control" : "data",
2266 relayd
->net_seq_idx
, fd
);
2269 * Add relayd socket pair to consumer data hashtable. If object already
2270 * exists or on error, the function gracefully returns.
2272 consumer_add_relayd(relayd
);