2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
32 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <urcu/list.h>
41 #include "libkernelctl.h"
42 #include "liblttsessiondcomm.h"
43 #include "kconsumerd.h"
45 /* Init the list of FDs */
46 static struct ltt_kconsumerd_fd_list kconsumerd_fd_list
= {
47 .head
= CDS_LIST_HEAD_INIT(kconsumerd_fd_list
.head
),
50 /* Number of element for the list below. */
51 static unsigned int fds_count
;
53 /* If the local array of FDs needs update in the poll function */
54 static unsigned int update_fd_array
= 1;
56 /* lock the fd array and structures */
57 static pthread_mutex_t kconsumerd_lock_fds
;
59 /* the two threads (receive fd and poll) */
60 static pthread_t threads
[2];
62 /* communication with splice */
63 static int thread_pipe
[2];
65 /* pipe to wake the poll thread when necessary */
66 static int poll_pipe
[2];
68 /* socket to communicate errors with sessiond */
69 static int error_socket
= -1;
71 /* to count the number of time the user pressed ctrl+c */
72 static int sigintcount
= 0;
74 /* flag to inform the polling thread to quit when all fd hung up */
77 /* Argument variables */
80 static int opt_daemon
;
81 static const char *progname
;
82 static char command_sock_path
[PATH_MAX
]; /* Global command socket path */
83 static char error_sock_path
[PATH_MAX
]; /* Global error path */
88 * Remove a fd from the global list protected by a mutex
90 static void del_fd(struct ltt_kconsumerd_fd
*lcf
)
92 DBG("Removing %d", lcf
->consumerd_fd
);
93 pthread_mutex_lock(&kconsumerd_lock_fds
);
94 cds_list_del(&lcf
->list
);
97 DBG("Removed ltt_kconsumerd_fd");
100 close(lcf
->consumerd_fd
);
105 pthread_mutex_unlock(&kconsumerd_lock_fds
);
111 * Cleanup the daemon's socket on exit
113 static void cleanup()
115 struct ltt_kconsumerd_fd
*iter
;
117 /* remove the socket file */
118 unlink(command_sock_path
);
120 /* unblock the threads */
121 WARN("Terminating the threads before exiting");
122 pthread_cancel(threads
[0]);
123 pthread_cancel(threads
[1]);
125 /* close all outfd */
126 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
134 * send return code to ltt-sessiond
136 static int send_error(enum lttcomm_return_code cmd
)
138 if (error_socket
> 0) {
139 return lttcomm_send_unix_sock(error_socket
, &cmd
,
140 sizeof(enum lttcomm_sessiond_command
));
149 * Add a fd to the global list protected by a mutex
151 static int add_fd(struct lttcomm_kconsumerd_msg
*buf
, int consumerd_fd
)
153 struct ltt_kconsumerd_fd
*tmp_fd
;
156 tmp_fd
= malloc(sizeof(struct ltt_kconsumerd_fd
));
157 tmp_fd
->sessiond_fd
= buf
->fd
;
158 tmp_fd
->consumerd_fd
= consumerd_fd
;
159 tmp_fd
->state
= buf
->state
;
160 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
161 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
163 /* Opening the tracefile in write mode */
164 DBG("Opening %s for writing", tmp_fd
->path_name
);
165 ret
= open(tmp_fd
->path_name
,
166 O_WRONLY
|O_CREAT
|O_TRUNC
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
168 ERR("Opening %s", tmp_fd
->path_name
);
172 tmp_fd
->out_fd
= ret
;
173 tmp_fd
->out_fd_offset
= 0;
175 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
176 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
178 pthread_mutex_lock(&kconsumerd_lock_fds
);
179 cds_list_add(&tmp_fd
->list
, &kconsumerd_fd_list
.head
);
181 pthread_mutex_unlock(&kconsumerd_lock_fds
);
191 * Signal handler for the daemon
193 static void sighandler(int sig
)
195 if (sig
== SIGINT
&& sigintcount
++ == 0) {
196 DBG("ignoring first SIGINT");
208 * Setup signal handler for :
209 * SIGINT, SIGTERM, SIGPIPE
211 static int set_signal_handler(void)
217 if ((ret
= sigemptyset(&sigset
)) < 0) {
218 perror("sigemptyset");
222 sa
.sa_handler
= sighandler
;
225 if ((ret
= sigaction(SIGTERM
, &sa
, NULL
)) < 0) {
230 if ((ret
= sigaction(SIGINT
, &sa
, NULL
)) < 0) {
235 if ((ret
= sigaction(SIGPIPE
, &sa
, NULL
)) < 0) {
244 * on_read_subbuffer_mmap
246 * mmap the ring buffer, read it and write the data to the tracefile.
247 * Returns the number of bytes written
249 static int on_read_subbuffer_mmap(struct ltt_kconsumerd_fd
*kconsumerd_fd
,
252 unsigned long mmap_len
;
253 unsigned long mmap_offset
;
254 unsigned long padded_len
;
255 unsigned long padding_len
;
257 char *padding
= NULL
;
259 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
260 int fd
= kconsumerd_fd
->consumerd_fd
;
261 int outfd
= kconsumerd_fd
->out_fd
;
263 /* get the padded subbuffer size to know the padding required */
264 ret
= kernctl_get_padded_subbuf_size(fd
, &padded_len
);
267 perror("kernctl_get_padded_subbuf_size");
270 padding_len
= padded_len
- len
;
271 padding
= malloc(padding_len
* sizeof(char));
272 memset(padding
, '\0', padding_len
);
274 /* get the len of the mmap region */
275 ret
= kernctl_get_mmap_len(fd
, &mmap_len
);
278 perror("kernctl_get_mmap_len");
282 /* get the offset inside the fd to mmap */
283 ret
= kernctl_get_mmap_read_offset(fd
, &mmap_offset
);
286 perror("kernctl_get_mmap_read_offset");
290 mmap_base
= mmap(NULL
, mmap_len
, PROT_READ
, MAP_PRIVATE
, fd
, mmap_offset
);
291 if (mmap_base
== MAP_FAILED
) {
292 perror("Error mmaping");
298 ret
= write(outfd
, mmap_base
, len
);
301 } else if (ret
< 0) {
303 perror("Error in file write");
306 /* This won't block, but will start writeout asynchronously */
307 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
308 SYNC_FILE_RANGE_WRITE
);
309 kconsumerd_fd
->out_fd_offset
+= ret
;
312 /* once all the data is written, write the padding to disk */
313 ret
= write(outfd
, padding
, padding_len
);
316 perror("Error writing padding to file");
321 * This does a blocking write-and-wait on any page that belongs to the
322 * subbuffer prior to the one we just wrote.
323 * Don't care about error values, as these are just hints and ways to
324 * limit the amount of page cache used.
326 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
327 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
328 kconsumerd_fd
->max_sb_size
,
329 SYNC_FILE_RANGE_WAIT_BEFORE
330 | SYNC_FILE_RANGE_WRITE
331 | SYNC_FILE_RANGE_WAIT_AFTER
);
333 * Give hints to the kernel about how we access the file:
334 * POSIX_FADV_DONTNEED : we won't re-access data in a near
335 * future after we write it.
336 * We need to call fadvise again after the file grows because
337 * the kernel does not seem to apply fadvise to non-existing
339 * Call fadvise _after_ having waited for the page writeback to
340 * complete because the dirty page writeback semantic is not
341 * well defined. So it can be expected to lead to lower
342 * throughput in streaming.
344 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
345 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
350 if (padding
!= NULL
) {
359 * Splice the data from the ring buffer to the tracefile.
360 * Returns the number of bytes spliced
362 static int on_read_subbuffer(struct ltt_kconsumerd_fd
*kconsumerd_fd
,
367 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
368 int fd
= kconsumerd_fd
->consumerd_fd
;
369 int outfd
= kconsumerd_fd
->out_fd
;
372 DBG("splice chan to pipe offset %lu (fd : %d)",
373 (unsigned long)offset
, fd
);
374 ret
= splice(fd
, &offset
, thread_pipe
[1], NULL
, len
,
375 SPLICE_F_MOVE
| SPLICE_F_MORE
);
376 DBG("splice chan to pipe ret %ld", ret
);
379 perror("Error in relay splice");
383 ret
= splice(thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
384 SPLICE_F_MOVE
| SPLICE_F_MORE
);
385 DBG("splice pipe to file %ld", ret
);
388 perror("Error in file splice");
394 /* This won't block, but will start writeout asynchronously */
395 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
396 SYNC_FILE_RANGE_WRITE
);
397 kconsumerd_fd
->out_fd_offset
+= ret
;
401 * This does a blocking write-and-wait on any page that belongs to the
402 * subbuffer prior to the one we just wrote.
403 * Don't care about error values, as these are just hints and ways to
404 * limit the amount of page cache used.
406 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
407 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
408 kconsumerd_fd
->max_sb_size
,
409 SYNC_FILE_RANGE_WAIT_BEFORE
410 | SYNC_FILE_RANGE_WRITE
411 | SYNC_FILE_RANGE_WAIT_AFTER
);
413 * Give hints to the kernel about how we access the file:
414 * POSIX_FADV_DONTNEED : we won't re-access data in a near
415 * future after we write it.
416 * We need to call fadvise again after the file grows because
417 * the kernel does not seem to apply fadvise to non-existing
419 * Call fadvise _after_ having waited for the page writeback to
420 * complete because the dirty page writeback semantic is not
421 * well defined. So it can be expected to lead to lower
422 * throughput in streaming.
424 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
425 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
430 /* send the appropriate error description to sessiond */
433 send_error(KCONSUMERD_SPLICE_EBADF
);
436 send_error(KCONSUMERD_SPLICE_EINVAL
);
439 send_error(KCONSUMERD_SPLICE_ENOMEM
);
442 send_error(KCONSUMERD_SPLICE_ESPIPE
);
453 * Consume data on a file descriptor and write it on a trace file
455 static int read_subbuffer(struct ltt_kconsumerd_fd
*kconsumerd_fd
)
460 int infd
= kconsumerd_fd
->consumerd_fd
;
462 DBG("In read_subbuffer (infd : %d)", infd
);
463 /* Get the next subbuffer */
464 err
= kernctl_get_next_subbuf(infd
);
467 perror("Reserving sub buffer failed (everything is normal, "
468 "it is due to concurrency)");
472 if (DEFAULT_CHANNEL_OUTPUT
== LTTNG_KERNEL_SPLICE
) {
473 /* read the whole subbuffer */
474 err
= kernctl_get_padded_subbuf_size(infd
, &len
);
477 perror("Getting sub-buffer len failed.");
481 /* splice the subbuffer to the tracefile */
482 ret
= on_read_subbuffer(kconsumerd_fd
, len
);
485 * display the error but continue processing to try
486 * to release the subbuffer
488 ERR("Error splicing to tracefile");
490 } else if (DEFAULT_CHANNEL_OUTPUT
== LTTNG_KERNEL_MMAP
) {
491 /* read the used subbuffer size */
492 err
= kernctl_get_subbuf_size(infd
, &len
);
495 perror("Getting sub-buffer len failed.");
499 /* write the subbuffer to the tracefile */
500 ret
= on_read_subbuffer_mmap(kconsumerd_fd
, len
);
503 * display the error but continue processing to try
504 * to release the subbuffer
506 ERR("Error writing to tracefile");
509 ERR("Unknown output method");
514 err
= kernctl_put_next_subbuf(infd
);
517 if (errno
== EFAULT
) {
518 perror("Error in unreserving sub buffer\n");
519 } else if (errno
== EIO
) {
520 /* Should never happen with newer LTTng versions */
521 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
533 * Update a fd according to what we just received
535 static void change_fd_state(int sessiond_fd
,
536 enum kconsumerd_fd_state state
)
538 struct ltt_kconsumerd_fd
*iter
;
539 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
540 if (iter
->sessiond_fd
== sessiond_fd
) {
550 * Receives an array of file descriptors and the associated
551 * structures describing each fd (path name).
552 * Returns the size of received data
554 static int consumerd_recv_fd(int sfd
, int size
,
555 enum kconsumerd_command cmd_type
)
559 int ret
= 0, i
, tmp2
;
560 struct cmsghdr
*cmsg
;
562 char recv_fd
[CMSG_SPACE(sizeof(int))];
563 struct lttcomm_kconsumerd_msg lkm
;
565 /* the number of fds we are about to receive */
566 nb_fd
= size
/ sizeof(struct lttcomm_kconsumerd_msg
);
568 for (i
= 0; i
< nb_fd
; i
++) {
569 memset(&msg
, 0, sizeof(msg
));
571 /* Prepare to receive the structures */
572 iov
[0].iov_base
= &lkm
;
573 iov
[0].iov_len
= sizeof(lkm
);
577 msg
.msg_control
= recv_fd
;
578 msg
.msg_controllen
= sizeof(recv_fd
);
580 DBG("Waiting to receive fd");
581 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
586 if (ret
!= (size
/ nb_fd
)) {
587 ERR("Received only %d, expected %d", ret
, size
);
588 send_error(KCONSUMERD_ERROR_RECV_FD
);
592 cmsg
= CMSG_FIRSTHDR(&msg
);
594 ERR("Invalid control message header");
596 send_error(KCONSUMERD_ERROR_RECV_FD
);
600 /* if we received fds */
601 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
604 DBG("add_fd %s (%d)", lkm
.path_name
, (CMSG_DATA(cmsg
)[0]));
605 ret
= add_fd(&lkm
, (CMSG_DATA(cmsg
)[0]));
607 send_error(KCONSUMERD_OUTFD_ERROR
);
612 change_fd_state(lkm
.fd
, lkm
.state
);
617 /* flag to tell the polling thread to update its fd array */
619 /* signal the poll thread */
620 tmp2
= write(poll_pipe
[1], "4", 1);
622 ERR("Didn't received any fd");
623 send_error(KCONSUMERD_ERROR_RECV_FD
);
630 DBG("consumerd_recv_fd thread exiting");
637 * This thread listens on the consumerd socket and
638 * receives the file descriptors from ltt-sessiond
640 static void *thread_receive_fds(void *data
)
642 int sock
, client_socket
, ret
;
643 struct lttcomm_kconsumerd_header tmp
;
645 DBG("Creating command socket %s", command_sock_path
);
646 unlink(command_sock_path
);
647 client_socket
= lttcomm_create_unix_sock(command_sock_path
);
648 if (client_socket
< 0) {
649 ERR("Cannot create command socket");
653 ret
= lttcomm_listen_unix_sock(client_socket
);
658 DBG("Sending ready command to ltt-sessiond");
659 ret
= send_error(KCONSUMERD_COMMAND_SOCK_READY
);
661 ERR("Error sending ready command to ltt-sessiond");
665 /* Blocking call, waiting for transmission */
666 sock
= lttcomm_accept_unix_sock(client_socket
);
672 /* We first get the number of fd we are about to receive */
673 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
674 sizeof(struct lttcomm_kconsumerd_header
));
676 ERR("Communication interrupted on command socket");
679 if (tmp
.cmd_type
== STOP
) {
680 DBG("Received STOP command");
683 /* we received a command to add or update fds */
684 ret
= consumerd_recv_fd(sock
, tmp
.payload_size
, tmp
.cmd_type
);
686 ERR("Receiving the FD, exiting");
692 DBG("thread_receive_fds exiting");
694 ret
= write(poll_pipe
[1], "4", 1);
696 perror("poll pipe write");
704 * Allocate the pollfd structure and the local view of the out fds
705 * to avoid doing a lookup in the linked list and concurrency issues
706 * when writing is needed.
707 * Returns the number of fds in the structures
709 static int update_poll_array(struct pollfd
**pollfd
,
710 struct ltt_kconsumerd_fd
**local_kconsumerd_fd
)
712 struct ltt_kconsumerd_fd
*iter
;
716 DBG("Updating poll fd array");
717 pthread_mutex_lock(&kconsumerd_lock_fds
);
719 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
720 DBG("Inside for each");
721 if (iter
->state
== ACTIVE_FD
) {
722 DBG("Active FD %d", iter
->consumerd_fd
);
723 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
724 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
725 local_kconsumerd_fd
[i
] = iter
;
730 * insert the poll_pipe at the end of the array and don't increment i
731 * so nb_fd is the number of real FD
733 (*pollfd
)[i
].fd
= poll_pipe
[0];
734 (*pollfd
)[i
].events
= POLLIN
;
737 pthread_mutex_unlock(&kconsumerd_lock_fds
);
745 * This thread polls the fds in the ltt_fd_list to consume the data
746 * and write it to tracefile if necessary.
748 static void *thread_poll_fds(void *data
)
750 int num_rdy
, num_hup
, high_prio
, ret
, i
;
751 struct pollfd
*pollfd
= NULL
;
752 /* local view of the fds */
753 struct ltt_kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
754 /* local view of fds_count */
759 ret
= pipe(thread_pipe
);
761 perror("Error creating pipe");
765 local_kconsumerd_fd
= malloc(sizeof(struct ltt_kconsumerd_fd
));
772 * the ltt_fd_list has been updated, we need to update our
773 * local array as well
775 if (update_fd_array
== 1) {
776 if (pollfd
!= NULL
) {
780 if (local_kconsumerd_fd
!= NULL
) {
781 free(local_kconsumerd_fd
);
782 local_kconsumerd_fd
= NULL
;
784 /* allocate for all fds + 1 for the poll_pipe */
785 pollfd
= malloc((fds_count
+ 1) * sizeof(struct pollfd
));
786 if (pollfd
== NULL
) {
787 perror("pollfd malloc");
790 /* allocate for all fds + 1 for the poll_pipe */
791 local_kconsumerd_fd
= malloc((fds_count
+ 1) * sizeof(struct ltt_kconsumerd_fd
));
792 if (local_kconsumerd_fd
== NULL
) {
793 perror("local_kconsumerd_fd malloc");
797 ret
= update_poll_array(&pollfd
, local_kconsumerd_fd
);
799 ERR("Error in allocating pollfd or local_outfds");
800 send_error(KCONSUMERD_POLL_ERROR
);
806 /* poll on the array of fds */
807 DBG("polling on %d fd", nb_fd
+ 1);
808 num_rdy
= poll(pollfd
, nb_fd
+ 1, -1);
809 DBG("poll num_rdy : %d", num_rdy
);
811 perror("Poll error");
812 send_error(KCONSUMERD_POLL_ERROR
);
816 /* No FDs and quit, cleanup the thread */
817 if (nb_fd
== 0 && quit
== 1) {
822 * if only the poll_pipe triggered poll to return just return to the
823 * beginning of the loop to update the array
825 if (num_rdy
== 1 && pollfd
[nb_fd
].revents
== POLLIN
) {
826 DBG("poll_pipe wake up");
827 tmp2
= read(poll_pipe
[0], &tmp
, 1);
831 /* Take care of high priority channels first. */
832 for (i
= 0; i
< nb_fd
; i
++) {
833 switch(pollfd
[i
].revents
) {
835 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
836 del_fd(local_kconsumerd_fd
[i
]);
841 ERR("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
842 del_fd(local_kconsumerd_fd
[i
]);
847 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
848 del_fd(local_kconsumerd_fd
[i
]);
853 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
855 ret
= read_subbuffer(local_kconsumerd_fd
[i
]);
856 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
864 /* If every buffer FD has hung up, we end the read loop here */
865 if (nb_fd
> 0 && num_hup
== nb_fd
) {
866 DBG("every buffer FD has hung up\n");
873 /* Take care of low priority channels. */
874 if (high_prio
== 0) {
875 for (i
= 0; i
< nb_fd
; i
++) {
876 if (pollfd
[i
].revents
== POLLIN
) {
877 DBG("Normal read on fd %d", pollfd
[i
].fd
);
878 ret
= read_subbuffer(local_kconsumerd_fd
[i
]);
879 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
888 DBG("polling thread exiting");
889 if (pollfd
!= NULL
) {
893 if (local_kconsumerd_fd
!= NULL
) {
894 free(local_kconsumerd_fd
);
895 local_kconsumerd_fd
= NULL
;
902 * usage function on stderr
904 static void usage(void)
906 fprintf(stderr
, "Usage: %s OPTIONS\n\nOptions:\n", progname
);
907 fprintf(stderr
, " -h, --help "
908 "Display this usage.\n");
909 fprintf(stderr
, " -c, --kconsumerd-cmd-sock PATH "
910 "Specify path for the command socket\n");
911 fprintf(stderr
, " -e, --kconsumerd-err-sock PATH "
912 "Specify path for the error socket\n");
913 fprintf(stderr
, " -d, --daemonize "
914 "Start as a daemon.\n");
915 fprintf(stderr
, " -q, --quiet "
916 "No output at all.\n");
917 fprintf(stderr
, " -v, --verbose "
918 "Verbose mode. Activate DBG() macro.\n");
919 fprintf(stderr
, " -V, --version "
920 "Show version number.\n");
924 * daemon argument parsing
926 static void parse_args(int argc
, char **argv
)
930 static struct option long_options
[] = {
931 { "kconsumerd-cmd-sock", 1, 0, 'c' },
932 { "kconsumerd-err-sock", 1, 0, 'e' },
933 { "daemonize", 0, 0, 'd' },
934 { "help", 0, 0, 'h' },
935 { "quiet", 0, 0, 'q' },
936 { "verbose", 0, 0, 'v' },
937 { "version", 0, 0, 'V' },
942 int option_index
= 0;
943 c
= getopt_long(argc
, argv
, "dhqvV" "c:e:", long_options
, &option_index
);
950 fprintf(stderr
, "option %s", long_options
[option_index
].name
);
952 fprintf(stderr
, " with arg %s\n", optarg
);
956 snprintf(command_sock_path
, PATH_MAX
, "%s", optarg
);
959 snprintf(error_sock_path
, PATH_MAX
, "%s", optarg
);
974 fprintf(stdout
, "%s\n", VERSION
);
987 int main(int argc
, char **argv
)
993 /* Parse arguments */
995 parse_args(argc
, argv
);
1006 if (strlen(command_sock_path
) == 0) {
1007 snprintf(command_sock_path
, PATH_MAX
,
1008 KCONSUMERD_CMD_SOCK_PATH
);
1010 if (strlen(error_sock_path
) == 0) {
1011 snprintf(error_sock_path
, PATH_MAX
,
1012 KCONSUMERD_ERR_SOCK_PATH
);
1015 if (set_signal_handler() < 0) {
1019 /* create the pipe to wake to polling thread when needed */
1020 ret
= pipe(poll_pipe
);
1022 perror("Error creating poll pipe");
1026 /* Connect to the socket created by ltt-sessiond to report errors */
1027 DBG("Connecting to error socket %s", error_sock_path
);
1028 error_socket
= lttcomm_connect_unix_sock(error_sock_path
);
1029 /* not a fatal error, but all communication with ltt-sessiond will fail */
1030 if (error_socket
< 0) {
1031 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
1034 /* Create the thread to manage the receive of fd */
1035 ret
= pthread_create(&threads
[0], NULL
, thread_receive_fds
, (void *) NULL
);
1037 perror("pthread_create");
1041 /* Create thread to manage the polling/writing of traces */
1042 ret
= pthread_create(&threads
[1], NULL
, thread_poll_fds
, (void *) NULL
);
1044 perror("pthread_create");
1048 for (i
= 0; i
< 2; i
++) {
1049 ret
= pthread_join(threads
[i
], &status
);
1051 perror("pthread_join");
1056 send_error(KCONSUMERD_EXIT_SUCCESS
);
1061 send_error(KCONSUMERD_EXIT_FAILURE
);