2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
30 #include <urcu/list.h>
32 #include "libkernelctl.h"
33 #include "liblttkconsumerd.h"
37 struct kconsumerd_global_data
{
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
41 * ensures the count matches the number of items in the fd_list.
42 * It ensures the list updates *always* trigger an fd_array
43 * update (therefore need to make list update vs
44 * kconsumerd_data.need_update flag update atomic, and also flag
45 * read, fd array and flag clear atomic).
49 * Number of element for the list below. Protected by
50 * kconsumerd_data.lock.
52 unsigned int fds_count
;
54 * List of FDs. Protected by kconsumerd_data.lock.
56 struct kconsumerd_fd_list fd_list
;
58 * Flag specifying if the local array of FDs needs update in the
59 * poll function. Protected by kconsumerd_data.lock.
61 unsigned int need_update
;
63 .fd_list
.head
= CDS_LIST_HEAD_INIT(kconsumerd_data
.fd_list
.head
),
66 /* communication with splice */
67 static int kconsumerd_thread_pipe
[2];
69 /* pipe to wake the poll thread when necessary */
70 static int kconsumerd_poll_pipe
[2];
73 * TODO: create a should_quit pipe to let the signal handler wake up the
74 * fd receiver thread. It should be initialized before any signal can be
75 * received by the library.
78 /* timeout parameter, to control the polling thread grace period */
79 static int kconsumerd_poll_timeout
= -1;
81 /* socket to communicate errors with sessiond */
82 static int kconsumerd_error_socket
;
84 /* socket to exchange commands with sessiond */
85 static char *kconsumerd_command_sock_path
;
88 * flag to inform the polling thread to quit when all fd hung up.
89 * Updated by the kconsumerd_thread_receive_fds when it notices that all
90 * fds has hung up. Also updated by the signal handler
91 * (kconsumerd_should_exit()). Read by the polling threads.
93 static volatile int kconsumerd_quit
= 0;
96 * kconsumerd_set_error_socket
98 * Set the error socket
100 void kconsumerd_set_error_socket(int sock
)
102 kconsumerd_error_socket
= sock
;
106 * kconsumerd_set_command_socket_path
108 * Set the command socket path
110 void kconsumerd_set_command_socket_path(char *sock
)
112 kconsumerd_command_sock_path
= sock
;
116 * kconsumerd_find_session_fd
118 * Find a session fd in the global list.
120 * Return 1 if found else 0
122 static int kconsumerd_find_session_fd(int fd
)
124 struct kconsumerd_fd
*iter
;
126 pthread_mutex_lock(&kconsumerd_data
.lock
);
127 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
128 if (iter
->sessiond_fd
== fd
) {
129 DBG("Duplicate session fd %d", fd
);
130 pthread_mutex_unlock(&kconsumerd_data
.lock
);
134 pthread_mutex_unlock(&kconsumerd_data
.lock
);
142 * Remove a fd from the global list protected by a mutex
144 static void kconsumerd_del_fd(struct kconsumerd_fd
*lcf
)
146 pthread_mutex_lock(&kconsumerd_data
.lock
);
147 cds_list_del(&lcf
->list
);
148 if (kconsumerd_data
.fds_count
> 0) {
149 kconsumerd_data
.fds_count
--;
152 close(lcf
->consumerd_fd
);
157 kconsumerd_data
.need_update
= 1;
158 pthread_mutex_unlock(&kconsumerd_data
.lock
);
164 * Add a fd to the global list protected by a mutex
166 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg
*buf
, int consumerd_fd
)
169 struct kconsumerd_fd
*tmp_fd
;
171 pthread_mutex_lock(&kconsumerd_data
.lock
);
172 /* Check if already exist */
173 ret
= kconsumerd_find_session_fd(buf
->fd
);
178 tmp_fd
= malloc(sizeof(struct kconsumerd_fd
));
179 tmp_fd
->sessiond_fd
= buf
->fd
;
180 tmp_fd
->consumerd_fd
= consumerd_fd
;
181 tmp_fd
->state
= buf
->state
;
182 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
183 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
185 /* Opening the tracefile in write mode */
186 ret
= open(tmp_fd
->path_name
,
187 O_WRONLY
|O_CREAT
|O_TRUNC
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
189 ERR("Opening %s", tmp_fd
->path_name
);
193 tmp_fd
->out_fd
= ret
;
194 tmp_fd
->out_fd_offset
= 0;
196 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
197 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
199 cds_list_add(&tmp_fd
->list
, &kconsumerd_data
.fd_list
.head
);
200 kconsumerd_data
.fds_count
++;
201 kconsumerd_data
.need_update
= 1;
203 pthread_mutex_unlock(&kconsumerd_data
.lock
);
208 * kconsumerd_change_fd_state
210 * Update a fd according to what we just received
212 static void kconsumerd_change_fd_state(int sessiond_fd
,
213 enum kconsumerd_fd_state state
)
215 struct kconsumerd_fd
*iter
;
217 pthread_mutex_lock(&kconsumerd_data
.lock
);
218 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
219 if (iter
->sessiond_fd
== sessiond_fd
) {
224 kconsumerd_data
.need_update
= 1;
225 pthread_mutex_unlock(&kconsumerd_data
.lock
);
229 * kconsumerd_update_poll_array
231 * Allocate the pollfd structure and the local view of the out fds
232 * to avoid doing a lookup in the linked list and concurrency issues
233 * when writing is needed.
234 * Returns the number of fds in the structures
235 * Called with kconsumerd_data.lock held.
237 static int kconsumerd_update_poll_array(struct pollfd
**pollfd
,
238 struct kconsumerd_fd
**local_kconsumerd_fd
)
240 struct kconsumerd_fd
*iter
;
243 DBG("Updating poll fd array");
245 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
246 DBG("Inside for each");
247 if (iter
->state
== ACTIVE_FD
) {
248 DBG("Active FD %d", iter
->consumerd_fd
);
249 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
250 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
251 local_kconsumerd_fd
[i
] = iter
;
257 * insert the kconsumerd_poll_pipe at the end of the array and don't
258 * increment i so nb_fd is the number of real FD
260 (*pollfd
)[i
].fd
= kconsumerd_poll_pipe
[0];
261 (*pollfd
)[i
].events
= POLLIN
;
267 * kconsumerd_on_read_subbuffer_mmap
269 * mmap the ring buffer, read it and write the data to the tracefile.
270 * Returns the number of bytes written
272 static int kconsumerd_on_read_subbuffer_mmap(
273 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
275 unsigned long mmap_len
, mmap_offset
, padded_len
, padding_len
;
277 char *padding
= NULL
;
279 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
280 int fd
= kconsumerd_fd
->consumerd_fd
;
281 int outfd
= kconsumerd_fd
->out_fd
;
283 /* get the padded subbuffer size to know the padding required */
284 ret
= kernctl_get_padded_subbuf_size(fd
, &padded_len
);
287 perror("kernctl_get_padded_subbuf_size");
290 padding_len
= padded_len
- len
;
291 padding
= malloc(padding_len
* sizeof(char));
292 memset(padding
, '\0', padding_len
);
294 /* get the len of the mmap region */
295 ret
= kernctl_get_mmap_len(fd
, &mmap_len
);
298 perror("kernctl_get_mmap_len");
302 /* get the offset inside the fd to mmap */
303 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
306 perror("kernctl_get_mmap_read_offset");
310 mmap_base
= mmap(NULL
, mmap_len
, PROT_READ
, MAP_PRIVATE
, fd
, mmap_offset
);
311 if (mmap_base
== MAP_FAILED
) {
312 perror("Error mmaping");
318 ret
= write(outfd
, mmap_base
, len
);
321 } else if (ret
< 0) {
323 perror("Error in file write");
326 /* This won't block, but will start writeout asynchronously */
327 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
328 SYNC_FILE_RANGE_WRITE
);
329 kconsumerd_fd
->out_fd_offset
+= ret
;
332 /* once all the data is written, write the padding to disk */
333 ret
= write(outfd
, padding
, padding_len
);
336 perror("Error writing padding to file");
341 * This does a blocking write-and-wait on any page that belongs to the
342 * subbuffer prior to the one we just wrote.
343 * Don't care about error values, as these are just hints and ways to
344 * limit the amount of page cache used.
346 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
347 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
348 kconsumerd_fd
->max_sb_size
,
349 SYNC_FILE_RANGE_WAIT_BEFORE
350 | SYNC_FILE_RANGE_WRITE
351 | SYNC_FILE_RANGE_WAIT_AFTER
);
354 * Give hints to the kernel about how we access the file:
355 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
358 * We need to call fadvise again after the file grows because the
359 * kernel does not seem to apply fadvise to non-existing parts of the
362 * Call fadvise _after_ having waited for the page writeback to
363 * complete because the dirty page writeback semantic is not well
364 * defined. So it can be expected to lead to lower throughput in
367 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
368 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
373 if (padding
!= NULL
) {
380 * kconsumerd_on_read_subbuffer
382 * Splice the data from the ring buffer to the tracefile.
383 * Returns the number of bytes spliced
385 static int kconsumerd_on_read_subbuffer(
386 struct kconsumerd_fd
*kconsumerd_fd
, unsigned long len
)
390 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
391 int fd
= kconsumerd_fd
->consumerd_fd
;
392 int outfd
= kconsumerd_fd
->out_fd
;
395 DBG("splice chan to pipe offset %lu (fd : %d)",
396 (unsigned long)offset
, fd
);
397 ret
= splice(fd
, &offset
, kconsumerd_thread_pipe
[1], NULL
, len
,
398 SPLICE_F_MOVE
| SPLICE_F_MORE
);
399 DBG("splice chan to pipe ret %ld", ret
);
402 perror("Error in relay splice");
406 ret
= splice(kconsumerd_thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
407 SPLICE_F_MOVE
| SPLICE_F_MORE
);
408 DBG("splice pipe to file %ld", ret
);
411 perror("Error in file splice");
417 /* This won't block, but will start writeout asynchronously */
418 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
419 SYNC_FILE_RANGE_WRITE
);
420 kconsumerd_fd
->out_fd_offset
+= ret
;
424 * This does a blocking write-and-wait on any page that belongs to the
425 * subbuffer prior to the one we just wrote.
426 * Don't care about error values, as these are just hints and ways to
427 * limit the amount of page cache used.
429 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
430 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
431 kconsumerd_fd
->max_sb_size
,
432 SYNC_FILE_RANGE_WAIT_BEFORE
433 | SYNC_FILE_RANGE_WRITE
434 | SYNC_FILE_RANGE_WAIT_AFTER
);
436 * Give hints to the kernel about how we access the file:
437 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
440 * We need to call fadvise again after the file grows because the
441 * kernel does not seem to apply fadvise to non-existing parts of the
444 * Call fadvise _after_ having waited for the page writeback to
445 * complete because the dirty page writeback semantic is not well
446 * defined. So it can be expected to lead to lower throughput in
449 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
450 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
455 /* send the appropriate error description to sessiond */
458 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF
);
461 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL
);
464 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM
);
467 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE
);
476 * kconsumerd_read_subbuffer
478 * Consume data on a file descriptor and write it on a trace file
480 static int kconsumerd_read_subbuffer(struct kconsumerd_fd
*kconsumerd_fd
)
485 int infd
= kconsumerd_fd
->consumerd_fd
;
487 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd
);
488 /* Get the next subbuffer */
489 err
= kernctl_get_next_subbuf(infd
);
492 perror("Reserving sub buffer failed (everything is normal, "
493 "it is due to concurrency)");
497 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT
) {
498 case LTTNG_KERNEL_SPLICE
:
499 /* read the whole subbuffer */
500 err
= kernctl_get_padded_subbuf_size(infd
, &len
);
503 perror("Getting sub-buffer len failed.");
507 /* splice the subbuffer to the tracefile */
508 ret
= kconsumerd_on_read_subbuffer(kconsumerd_fd
, len
);
511 * display the error but continue processing to try
512 * to release the subbuffer
514 ERR("Error splicing to tracefile");
517 case LTTNG_KERNEL_MMAP
:
518 /* read the used subbuffer size */
519 err
= kernctl_get_subbuf_size(infd
, &len
);
522 perror("Getting sub-buffer len failed.");
525 /* write the subbuffer to the tracefile */
526 ret
= kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd
, len
);
529 * display the error but continue processing to try
530 * to release the subbuffer
532 ERR("Error writing to tracefile");
536 ERR("Unknown output method");
540 err
= kernctl_put_next_subbuf(infd
);
543 if (errno
== EFAULT
) {
544 perror("Error in unreserving sub buffer\n");
545 } else if (errno
== EIO
) {
546 /* Should never happen with newer LTTng versions */
547 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
557 * kconsumerd_consumerd_recv_fd
559 * Receives an array of file descriptors and the associated
560 * structures describing each fd (path name).
561 * Returns the size of received data
563 static int kconsumerd_consumerd_recv_fd(int sfd
, int size
,
564 enum kconsumerd_command cmd_type
)
568 int ret
= 0, i
, tmp2
;
569 struct cmsghdr
*cmsg
;
571 char recv_fd
[CMSG_SPACE(sizeof(int))];
572 struct lttcomm_kconsumerd_msg lkm
;
574 /* the number of fds we are about to receive */
575 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
577 for (i
= 0; i
< nb_fd
; i
++) {
578 memset(&msg
, 0, sizeof(msg
));
580 /* Prepare to receive the structures */
581 iov
[0].iov_base
= &lkm
;
582 iov
[0].iov_len
= sizeof(lkm
);
586 msg
.msg_control
= recv_fd
;
587 msg
.msg_controllen
= sizeof(recv_fd
);
589 DBG("Waiting to receive fd");
590 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
595 if (ret
!= (size
/ nb_fd
)) {
596 ERR("Received only %d, expected %d", ret
, size
);
597 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
601 cmsg
= CMSG_FIRSTHDR(&msg
);
603 ERR("Invalid control message header");
605 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
608 /* if we received fds */
609 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
612 DBG("kconsumerd_add_fd %s (%d)", lkm
.path_name
, (CMSG_DATA(cmsg
)[0]));
613 ret
= kconsumerd_add_fd(&lkm
, (CMSG_DATA(cmsg
)[0]));
615 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR
);
620 kconsumerd_change_fd_state(lkm
.fd
, lkm
.state
);
625 /* signal the poll thread */
626 tmp2
= write(kconsumerd_poll_pipe
[1], "4", 1);
628 ERR("Didn't received any fd");
629 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD
);
640 * kconsumerd_thread_poll_fds
642 * This thread polls the fds in the ltt_fd_list to consume the data
643 * and write it to tracefile if necessary.
645 void *kconsumerd_thread_poll_fds(void *data
)
647 int num_rdy
, num_hup
, high_prio
, ret
, i
;
648 struct pollfd
*pollfd
= NULL
;
649 /* local view of the fds */
650 struct kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
651 /* local view of kconsumerd_data.fds_count */
656 ret
= pipe(kconsumerd_thread_pipe
);
658 perror("Error creating pipe");
662 local_kconsumerd_fd
= malloc(sizeof(struct kconsumerd_fd
));
669 * the ltt_fd_list has been updated, we need to update our
670 * local array as well
672 pthread_mutex_lock(&kconsumerd_data
.lock
);
673 if (kconsumerd_data
.need_update
) {
674 if (pollfd
!= NULL
) {
678 if (local_kconsumerd_fd
!= NULL
) {
679 free(local_kconsumerd_fd
);
680 local_kconsumerd_fd
= NULL
;
683 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
684 pollfd
= malloc((kconsumerd_data
.fds_count
+ 1) * sizeof(struct pollfd
));
685 if (pollfd
== NULL
) {
686 perror("pollfd malloc");
687 pthread_mutex_unlock(&kconsumerd_data
.lock
);
691 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
692 local_kconsumerd_fd
= malloc((kconsumerd_data
.fds_count
+ 1) *
693 sizeof(struct kconsumerd_fd
));
694 if (local_kconsumerd_fd
== NULL
) {
695 perror("local_kconsumerd_fd malloc");
696 pthread_mutex_unlock(&kconsumerd_data
.lock
);
699 ret
= kconsumerd_update_poll_array(&pollfd
, local_kconsumerd_fd
);
701 ERR("Error in allocating pollfd or local_outfds");
702 kconsumerd_send_error(KCONSUMERD_POLL_ERROR
);
703 pthread_mutex_unlock(&kconsumerd_data
.lock
);
707 kconsumerd_data
.need_update
= 0;
709 pthread_mutex_unlock(&kconsumerd_data
.lock
);
711 /* poll on the array of fds */
712 DBG("polling on %d fd", nb_fd
+ 1);
713 num_rdy
= poll(pollfd
, nb_fd
+ 1, kconsumerd_poll_timeout
);
714 DBG("poll num_rdy : %d", num_rdy
);
716 perror("Poll error");
717 kconsumerd_send_error(KCONSUMERD_POLL_ERROR
);
719 } else if (num_rdy
== 0) {
720 DBG("Polling thread timed out");
724 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
725 if (nb_fd
== 0 && kconsumerd_quit
== 1) {
730 * If the kconsumerd_poll_pipe triggered poll go
731 * directly to the beginning of the loop to update the
732 * array. We want to prioritize array update over
733 * low-priority reads.
735 if (pollfd
[nb_fd
].revents
== POLLIN
) {
736 DBG("kconsumerd_poll_pipe wake up");
737 tmp2
= read(kconsumerd_poll_pipe
[0], &tmp
, 1);
741 /* Take care of high priority channels first. */
742 for (i
= 0; i
< nb_fd
; i
++) {
743 switch(pollfd
[i
].revents
) {
745 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
746 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
750 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
751 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
755 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
756 kconsumerd_del_fd(local_kconsumerd_fd
[i
]);
760 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
762 ret
= kconsumerd_read_subbuffer(local_kconsumerd_fd
[i
]);
763 /* it's ok to have an unavailable sub-buffer */
771 /* If every buffer FD has hung up, we end the read loop here */
772 if (nb_fd
> 0 && num_hup
== nb_fd
) {
773 DBG("every buffer FD has hung up\n");
774 if (kconsumerd_quit
== 1) {
780 /* Take care of low priority channels. */
781 if (high_prio
== 0) {
782 for (i
= 0; i
< nb_fd
; i
++) {
783 if (pollfd
[i
].revents
== POLLIN
) {
784 DBG("Normal read on fd %d", pollfd
[i
].fd
);
785 ret
= kconsumerd_read_subbuffer(local_kconsumerd_fd
[i
]);
786 /* it's ok to have an unavailable subbuffer */
795 DBG("polling thread exiting");
796 if (pollfd
!= NULL
) {
800 if (local_kconsumerd_fd
!= NULL
) {
801 free(local_kconsumerd_fd
);
802 local_kconsumerd_fd
= NULL
;
808 * kconsumerd_create_poll_pipe
810 * create the pipe to wake to polling thread when needed
812 int kconsumerd_create_poll_pipe()
814 return pipe(kconsumerd_poll_pipe
);
818 * kconsumerd_thread_receive_fds
820 * This thread listens on the consumerd socket and
821 * receives the file descriptors from ltt-sessiond
823 void *kconsumerd_thread_receive_fds(void *data
)
825 int sock
, client_socket
, ret
;
826 struct lttcomm_kconsumerd_header tmp
;
828 DBG("Creating command socket %s", kconsumerd_command_sock_path
);
829 unlink(kconsumerd_command_sock_path
);
830 client_socket
= lttcomm_create_unix_sock(kconsumerd_command_sock_path
);
831 if (client_socket
< 0) {
832 ERR("Cannot create command socket");
836 ret
= lttcomm_listen_unix_sock(client_socket
);
841 DBG("Sending ready command to ltt-sessiond");
842 ret
= kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY
);
844 ERR("Error sending ready command to ltt-sessiond");
848 /* TODO: poll on socket and "should_quit" fd pipe */
849 /* TODO: change blocking call into non-blocking call */
850 /* Blocking call, waiting for transmission */
851 sock
= lttcomm_accept_unix_sock(client_socket
);
857 /* We first get the number of fd we are about to receive */
858 /* TODO: poll on sock and "should_quit" fd pipe */
859 /* TODO: change recv into a non-blocking call */
860 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
861 sizeof(struct lttcomm_kconsumerd_header
));
863 ERR("Communication interrupted on command socket");
866 if (tmp
.cmd_type
== STOP
) {
867 DBG("Received STOP command");
870 if (kconsumerd_quit
) {
871 DBG("kconsumerd_thread_receive_fds received quit from signal");
874 /* we received a command to add or update fds */
875 ret
= kconsumerd_consumerd_recv_fd(sock
, tmp
.payload_size
, tmp
.cmd_type
);
877 ERR("Receiving the FD, exiting");
883 DBG("kconsumerd_thread_receive_fds exiting");
886 * when all fds have hung up, the polling thread
892 * 2s of grace period, if no polling events occur during
893 * this period, the polling thread will exit even if there
894 * are still open FDs (should not happen, but safety mechanism).
896 kconsumerd_poll_timeout
= KCONSUMERD_POLL_GRACE_PERIOD
;
898 /* wake up the polling thread */
899 ret
= write(kconsumerd_poll_pipe
[1], "4", 1);
901 perror("poll pipe write");
909 * Cleanup the daemon's socket on exit
911 void kconsumerd_cleanup(void)
913 struct kconsumerd_fd
*iter
;
915 /* remove the socket file */
916 unlink(kconsumerd_command_sock_path
);
919 * close all outfd. Called when there are no more threads
920 * running (after joining on the threads), no need to protect
921 * list iteration with mutex.
923 cds_list_for_each_entry(iter
, &kconsumerd_data
.fd_list
.head
, list
) {
924 kconsumerd_del_fd(iter
);
929 * Called from signal handler.
931 void kconsumerd_should_exit(void)
935 * TODO: write into a should_quit pipe to wake up the fd
941 * kconsumerd_send_error
943 * send return code to ltt-sessiond
945 int kconsumerd_send_error(enum lttcomm_return_code cmd
)
947 if (kconsumerd_error_socket
> 0) {
948 return lttcomm_send_unix_sock(kconsumerd_error_socket
, &cmd
,
949 sizeof(enum lttcomm_sessiond_command
));