Commit | Line | Data |
---|---|---|
00e2e675 DG |
1 | /* |
2 | * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com> | |
3 | * | |
4 | * This program is free software; you can redistribute it and/or modify it | |
5 | * under the terms of the GNU General Public License, version 2 only, as | |
6 | * published by the Free Software Foundation. | |
7 | * | |
8 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
9 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
10 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for | |
11 | * more details. | |
12 | * | |
13 | * You should have received a copy of the GNU General Public License along with | |
14 | * this program; if not, write to the Free Software Foundation, Inc., 51 | |
15 | * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
16 | */ | |
17 | ||
18 | #define _GNU_SOURCE | |
19 | #include <assert.h> | |
20 | #include <stdio.h> | |
21 | #include <stdlib.h> | |
22 | #include <string.h> | |
23 | #include <sys/stat.h> | |
24 | #include <sys/types.h> | |
25 | #include <unistd.h> | |
26 | ||
27 | #include <common/common.h> | |
28 | #include <common/defaults.h> | |
29 | #include <common/uri.h> | |
30 | ||
31 | #include "consumer.h" | |
32 | ||
a4b92340 DG |
33 | /* |
34 | * From a consumer_data structure, allocate and add a consumer socket to the | |
35 | * consumer output. | |
36 | * | |
37 | * Return 0 on success, else negative value on error | |
38 | */ | |
39 | int consumer_create_socket(struct consumer_data *data, | |
40 | struct consumer_output *output) | |
41 | { | |
42 | int ret = 0; | |
43 | struct consumer_socket *socket; | |
44 | ||
45 | assert(data); | |
46 | ||
47 | if (output == NULL || data->cmd_sock < 0) { | |
48 | /* | |
49 | * Not an error. Possible there is simply not spawned consumer or it's | |
50 | * disabled for the tracing session asking the socket. | |
51 | */ | |
52 | goto error; | |
53 | } | |
54 | ||
55 | rcu_read_lock(); | |
56 | socket = consumer_find_socket(data->cmd_sock, output); | |
57 | rcu_read_unlock(); | |
58 | if (socket == NULL) { | |
59 | socket = consumer_allocate_socket(data->cmd_sock); | |
60 | if (socket == NULL) { | |
61 | ret = -1; | |
62 | goto error; | |
63 | } | |
64 | ||
65 | socket->lock = &data->lock; | |
66 | rcu_read_lock(); | |
67 | consumer_add_socket(socket, output); | |
68 | rcu_read_unlock(); | |
69 | } | |
70 | ||
71 | DBG3("Consumer socket created (fd: %d) and added to output", | |
72 | data->cmd_sock); | |
73 | ||
74 | error: | |
75 | return ret; | |
76 | } | |
77 | ||
173af62f DG |
78 | /* |
79 | * Find a consumer_socket in a consumer_output hashtable. Read side lock must | |
80 | * be acquired before calling this function and across use of the | |
81 | * returned consumer_socket. | |
82 | */ | |
83 | struct consumer_socket *consumer_find_socket(int key, | |
84 | struct consumer_output *consumer) | |
85 | { | |
86 | struct lttng_ht_iter iter; | |
87 | struct lttng_ht_node_ulong *node; | |
88 | struct consumer_socket *socket = NULL; | |
89 | ||
90 | /* Negative keys are lookup failures */ | |
a4b92340 | 91 | if (key < 0 || consumer == NULL) { |
173af62f DG |
92 | return NULL; |
93 | } | |
94 | ||
95 | lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key), | |
96 | &iter); | |
97 | node = lttng_ht_iter_get_node_ulong(&iter); | |
98 | if (node != NULL) { | |
99 | socket = caa_container_of(node, struct consumer_socket, node); | |
100 | } | |
101 | ||
102 | return socket; | |
103 | } | |
104 | ||
105 | /* | |
106 | * Allocate a new consumer_socket and return the pointer. | |
107 | */ | |
108 | struct consumer_socket *consumer_allocate_socket(int fd) | |
109 | { | |
110 | struct consumer_socket *socket = NULL; | |
111 | ||
112 | socket = zmalloc(sizeof(struct consumer_socket)); | |
113 | if (socket == NULL) { | |
114 | PERROR("zmalloc consumer socket"); | |
115 | goto error; | |
116 | } | |
117 | ||
118 | socket->fd = fd; | |
119 | lttng_ht_node_init_ulong(&socket->node, fd); | |
120 | ||
121 | error: | |
122 | return socket; | |
123 | } | |
124 | ||
125 | /* | |
126 | * Add consumer socket to consumer output object. Read side lock must be | |
127 | * acquired before calling this function. | |
128 | */ | |
129 | void consumer_add_socket(struct consumer_socket *sock, | |
130 | struct consumer_output *consumer) | |
131 | { | |
132 | assert(sock); | |
133 | assert(consumer); | |
134 | ||
135 | lttng_ht_add_unique_ulong(consumer->socks, &sock->node); | |
136 | } | |
137 | ||
138 | /* | |
139 | * Delte consumer socket to consumer output object. Read side lock must be | |
140 | * acquired before calling this function. | |
141 | */ | |
142 | void consumer_del_socket(struct consumer_socket *sock, | |
143 | struct consumer_output *consumer) | |
144 | { | |
145 | int ret; | |
146 | struct lttng_ht_iter iter; | |
147 | ||
148 | assert(sock); | |
149 | assert(consumer); | |
150 | ||
151 | iter.iter.node = &sock->node.node; | |
152 | ret = lttng_ht_del(consumer->socks, &iter); | |
153 | assert(!ret); | |
154 | } | |
155 | ||
156 | /* | |
157 | * RCU destroy call function. | |
158 | */ | |
159 | static void destroy_socket_rcu(struct rcu_head *head) | |
160 | { | |
161 | struct lttng_ht_node_ulong *node = | |
162 | caa_container_of(head, struct lttng_ht_node_ulong, head); | |
163 | struct consumer_socket *socket = | |
164 | caa_container_of(node, struct consumer_socket, node); | |
165 | ||
166 | free(socket); | |
167 | } | |
168 | ||
169 | /* | |
170 | * Destroy and free socket pointer in a call RCU. Read side lock must be | |
171 | * acquired before calling this function. | |
172 | */ | |
173 | void consumer_destroy_socket(struct consumer_socket *sock) | |
174 | { | |
175 | assert(sock); | |
176 | ||
177 | /* | |
178 | * We DO NOT close the file descriptor here since it is global to the | |
179 | * session daemon and is closed only if the consumer dies. | |
180 | */ | |
181 | ||
182 | call_rcu(&sock->node.head, destroy_socket_rcu); | |
183 | } | |
184 | ||
00e2e675 DG |
185 | /* |
186 | * Allocate and assign data to a consumer_output object. | |
187 | * | |
188 | * Return pointer to structure. | |
189 | */ | |
190 | struct consumer_output *consumer_create_output(enum consumer_dst_type type) | |
191 | { | |
192 | struct consumer_output *output = NULL; | |
193 | ||
194 | output = zmalloc(sizeof(struct consumer_output)); | |
195 | if (output == NULL) { | |
196 | PERROR("zmalloc consumer_output"); | |
197 | goto error; | |
198 | } | |
199 | ||
200 | /* By default, consumer output is enabled */ | |
201 | output->enabled = 1; | |
202 | output->type = type; | |
203 | output->net_seq_index = -1; | |
173af62f DG |
204 | |
205 | output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); | |
00e2e675 DG |
206 | |
207 | error: | |
208 | return output; | |
209 | } | |
210 | ||
211 | /* | |
212 | * Delete the consumer_output object from the list and free the ptr. | |
213 | */ | |
214 | void consumer_destroy_output(struct consumer_output *obj) | |
215 | { | |
216 | if (obj == NULL) { | |
217 | return; | |
218 | } | |
219 | ||
173af62f DG |
220 | if (obj->socks) { |
221 | struct lttng_ht_iter iter; | |
222 | struct consumer_socket *socket; | |
223 | ||
224 | cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { | |
225 | consumer_destroy_socket(socket); | |
226 | } | |
00e2e675 | 227 | } |
173af62f | 228 | |
00e2e675 DG |
229 | free(obj); |
230 | } | |
231 | ||
232 | /* | |
233 | * Copy consumer output and returned the newly allocated copy. | |
234 | */ | |
235 | struct consumer_output *consumer_copy_output(struct consumer_output *obj) | |
236 | { | |
173af62f DG |
237 | struct lttng_ht_iter iter; |
238 | struct consumer_socket *socket, *copy_sock; | |
00e2e675 DG |
239 | struct consumer_output *output; |
240 | ||
241 | assert(obj); | |
242 | ||
243 | output = consumer_create_output(obj->type); | |
244 | if (output == NULL) { | |
245 | goto error; | |
246 | } | |
247 | ||
248 | memcpy(output, obj, sizeof(struct consumer_output)); | |
249 | ||
173af62f DG |
250 | /* Copy sockets */ |
251 | output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); | |
252 | ||
253 | cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { | |
254 | /* Create new socket object. */ | |
255 | copy_sock = consumer_allocate_socket(socket->fd); | |
256 | if (copy_sock == NULL) { | |
257 | goto malloc_error; | |
258 | } | |
259 | ||
260 | copy_sock->lock = socket->lock; | |
261 | consumer_add_socket(copy_sock, output); | |
262 | } | |
263 | ||
00e2e675 DG |
264 | error: |
265 | return output; | |
173af62f DG |
266 | |
267 | malloc_error: | |
268 | consumer_destroy_output(output); | |
269 | return NULL; | |
00e2e675 DG |
270 | } |
271 | ||
272 | /* | |
273 | * Set network URI to the consumer output object. | |
274 | * | |
275 | * Return 0 on success. Negative value on error. | |
276 | */ | |
277 | int consumer_set_network_uri(struct consumer_output *obj, | |
278 | struct lttng_uri *uri) | |
279 | { | |
280 | int ret; | |
281 | char tmp_path[PATH_MAX]; | |
282 | char hostname[HOST_NAME_MAX]; | |
283 | struct lttng_uri *dst_uri = NULL; | |
284 | ||
285 | /* Code flow error safety net. */ | |
286 | assert(obj); | |
287 | assert(uri); | |
288 | ||
289 | switch (uri->stype) { | |
290 | case LTTNG_STREAM_CONTROL: | |
291 | dst_uri = &obj->dst.net.control; | |
292 | obj->dst.net.control_isset = 1; | |
293 | if (uri->port == 0) { | |
294 | /* Assign default port. */ | |
295 | uri->port = DEFAULT_NETWORK_CONTROL_PORT; | |
296 | } | |
297 | break; | |
298 | case LTTNG_STREAM_DATA: | |
299 | dst_uri = &obj->dst.net.data; | |
300 | obj->dst.net.data_isset = 1; | |
301 | if (uri->port == 0) { | |
302 | /* Assign default port. */ | |
303 | uri->port = DEFAULT_NETWORK_DATA_PORT; | |
304 | } | |
305 | break; | |
306 | default: | |
307 | ERR("Set network uri type unknown %d", uri->stype); | |
308 | goto error; | |
309 | } | |
310 | ||
311 | ret = uri_compare(dst_uri, uri); | |
312 | if (!ret) { | |
313 | /* Same URI, don't touch it and return success. */ | |
314 | DBG3("URI network compare are the same"); | |
315 | goto end; | |
316 | } | |
317 | ||
318 | /* URIs were not equal, replacing it. */ | |
319 | memset(dst_uri, 0, sizeof(struct lttng_uri)); | |
320 | memcpy(dst_uri, uri, sizeof(struct lttng_uri)); | |
321 | obj->type = CONSUMER_DST_NET; | |
322 | ||
323 | /* Handle subdir and add hostname in front. */ | |
324 | if (dst_uri->stype == LTTNG_STREAM_CONTROL) { | |
325 | /* Get hostname to append it in the pathname */ | |
326 | ret = gethostname(hostname, sizeof(hostname)); | |
327 | if (ret < 0) { | |
328 | PERROR("gethostname. Fallback on default localhost"); | |
329 | strncpy(hostname, "localhost", sizeof(hostname)); | |
330 | } | |
331 | hostname[sizeof(hostname) - 1] = '\0'; | |
332 | ||
333 | /* Setup consumer subdir if none present in the control URI */ | |
334 | if (strlen(dst_uri->subdir) == 0) { | |
335 | ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", | |
336 | hostname, obj->subdir); | |
337 | } else { | |
338 | ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", | |
339 | hostname, dst_uri->subdir); | |
340 | } | |
341 | if (ret < 0) { | |
342 | PERROR("snprintf set consumer uri subdir"); | |
343 | goto error; | |
344 | } | |
345 | ||
346 | strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); | |
347 | DBG3("Consumer set network uri subdir path %s", tmp_path); | |
348 | } | |
349 | ||
350 | end: | |
351 | return 0; | |
352 | ||
353 | error: | |
354 | return -1; | |
355 | } | |
356 | ||
357 | /* | |
358 | * Send file descriptor to consumer via sock. | |
359 | */ | |
360 | int consumer_send_fds(int sock, int *fds, size_t nb_fd) | |
361 | { | |
362 | int ret; | |
363 | ||
364 | assert(fds); | |
365 | assert(nb_fd > 0); | |
366 | ||
367 | ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd); | |
368 | if (ret < 0) { | |
369 | PERROR("send consumer fds"); | |
370 | goto error; | |
371 | } | |
372 | ||
373 | error: | |
374 | return ret; | |
375 | } | |
376 | ||
377 | /* | |
378 | * Consumer send channel communication message structure to consumer. | |
379 | */ | |
380 | int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg) | |
381 | { | |
382 | int ret; | |
383 | ||
384 | assert(msg); | |
385 | assert(sock >= 0); | |
386 | ||
387 | ret = lttcomm_send_unix_sock(sock, msg, | |
388 | sizeof(struct lttcomm_consumer_msg)); | |
389 | if (ret < 0) { | |
390 | PERROR("send consumer channel"); | |
391 | goto error; | |
392 | } | |
393 | ||
394 | error: | |
395 | return ret; | |
396 | } | |
397 | ||
398 | /* | |
399 | * Init channel communication message structure. | |
400 | */ | |
401 | void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, | |
402 | enum lttng_consumer_command cmd, | |
403 | int channel_key, | |
404 | uint64_t max_sb_size, | |
405 | uint64_t mmap_len, | |
406 | const char *name) | |
407 | { | |
408 | assert(msg); | |
409 | ||
410 | /* TODO: Args validation */ | |
411 | ||
412 | /* Zeroed structure */ | |
413 | memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); | |
414 | ||
415 | /* Send channel */ | |
416 | msg->cmd_type = cmd; | |
417 | msg->u.channel.channel_key = channel_key; | |
418 | msg->u.channel.max_sb_size = max_sb_size; | |
419 | msg->u.channel.mmap_len = mmap_len; | |
420 | } | |
421 | ||
422 | /* | |
423 | * Init stream communication message structure. | |
424 | */ | |
425 | void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, | |
426 | enum lttng_consumer_command cmd, | |
427 | int channel_key, | |
428 | int stream_key, | |
429 | uint32_t state, | |
430 | enum lttng_event_output output, | |
431 | uint64_t mmap_len, | |
432 | uid_t uid, | |
433 | gid_t gid, | |
434 | int net_index, | |
435 | unsigned int metadata_flag, | |
436 | const char *name, | |
437 | const char *pathname) | |
438 | { | |
439 | assert(msg); | |
440 | ||
441 | memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); | |
442 | ||
443 | /* TODO: Args validation */ | |
444 | ||
445 | msg->cmd_type = cmd; | |
446 | msg->u.stream.channel_key = channel_key; | |
447 | msg->u.stream.stream_key = stream_key; | |
448 | msg->u.stream.state = state; | |
449 | msg->u.stream.output = output; | |
450 | msg->u.stream.mmap_len = mmap_len; | |
451 | msg->u.stream.uid = uid; | |
452 | msg->u.stream.gid = gid; | |
453 | msg->u.stream.net_index = net_index; | |
454 | msg->u.stream.metadata_flag = metadata_flag; | |
455 | strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); | |
456 | msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; | |
457 | strncpy(msg->u.stream.path_name, pathname, | |
458 | sizeof(msg->u.stream.path_name)); | |
459 | msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; | |
460 | } | |
461 | ||
462 | /* | |
463 | * Send stream communication structure to the consumer. | |
464 | */ | |
465 | int consumer_send_stream(int sock, struct consumer_output *dst, | |
466 | struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd) | |
467 | { | |
468 | int ret; | |
469 | ||
470 | assert(msg); | |
471 | assert(dst); | |
472 | ||
473 | switch (dst->type) { | |
474 | case CONSUMER_DST_NET: | |
475 | /* Consumer should send the stream on the network. */ | |
476 | msg->u.stream.net_index = dst->net_seq_index; | |
477 | break; | |
478 | case CONSUMER_DST_LOCAL: | |
479 | /* Add stream file name to stream path */ | |
480 | strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name)); | |
481 | strncat(msg->u.stream.path_name, msg->u.stream.name, | |
482 | sizeof(msg->u.stream.path_name)); | |
483 | msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; | |
484 | /* Indicate that the stream is NOT network */ | |
485 | msg->u.stream.net_index = -1; | |
486 | break; | |
487 | default: | |
488 | ERR("Consumer unknown output type (%d)", dst->type); | |
489 | ret = -1; | |
490 | goto error; | |
491 | } | |
492 | ||
493 | /* Send on socket */ | |
494 | ret = lttcomm_send_unix_sock(sock, msg, | |
495 | sizeof(struct lttcomm_consumer_msg)); | |
496 | if (ret < 0) { | |
497 | PERROR("send consumer stream"); | |
498 | goto error; | |
499 | } | |
500 | ||
501 | ret = consumer_send_fds(sock, fds, nb_fd); | |
502 | if (ret < 0) { | |
503 | goto error; | |
504 | } | |
505 | ||
506 | error: | |
507 | return ret; | |
508 | } | |
37278a1e DG |
509 | |
510 | /* | |
511 | * Send relayd socket to consumer associated with a session name. | |
512 | * | |
513 | * On success return positive value. On error, negative value. | |
514 | */ | |
515 | int consumer_send_relayd_socket(int consumer_sock, | |
516 | struct lttcomm_sock *sock, struct consumer_output *consumer, | |
517 | enum lttng_stream_type type) | |
518 | { | |
519 | int ret; | |
520 | struct lttcomm_consumer_msg msg; | |
521 | ||
522 | /* Code flow error. Safety net. */ | |
523 | assert(sock); | |
524 | assert(consumer); | |
525 | ||
526 | /* Bail out if consumer is disabled */ | |
527 | if (!consumer->enabled) { | |
528 | ret = LTTCOMM_OK; | |
529 | goto error; | |
530 | } | |
531 | ||
532 | msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; | |
533 | /* | |
534 | * Assign network consumer output index using the temporary consumer since | |
535 | * this call should only be made from within a set_consumer_uri() function | |
536 | * call in the session daemon. | |
537 | */ | |
538 | msg.u.relayd_sock.net_index = consumer->net_seq_index; | |
539 | msg.u.relayd_sock.type = type; | |
540 | memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); | |
541 | ||
173af62f | 542 | DBG3("Sending relayd sock info to consumer on %d", consumer_sock); |
37278a1e DG |
543 | ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); |
544 | if (ret < 0) { | |
545 | PERROR("send consumer relayd socket info"); | |
546 | goto error; | |
547 | } | |
548 | ||
549 | DBG3("Sending relayd socket file descriptor to consumer"); | |
550 | ret = consumer_send_fds(consumer_sock, &sock->fd, 1); | |
551 | if (ret < 0) { | |
552 | goto error; | |
553 | } | |
554 | ||
555 | DBG2("Consumer relayd socket sent"); | |
556 | ||
557 | error: | |
558 | return ret; | |
559 | } | |
173af62f DG |
560 | |
561 | /* | |
562 | * Send destroy relayd command to consumer. | |
563 | * | |
564 | * On success return positive value. On error, negative value. | |
565 | */ | |
566 | int consumer_send_destroy_relayd(struct consumer_socket *sock, | |
567 | struct consumer_output *consumer) | |
568 | { | |
569 | int ret; | |
570 | struct lttcomm_consumer_msg msg; | |
571 | ||
572 | assert(consumer); | |
573 | assert(sock); | |
574 | ||
575 | DBG2("Sending destroy relayd command to consumer..."); | |
576 | ||
577 | /* Bail out if consumer is disabled */ | |
578 | if (!consumer->enabled) { | |
579 | ret = LTTCOMM_OK; | |
580 | DBG3("Consumer is disabled"); | |
581 | goto error; | |
582 | } | |
583 | ||
584 | msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD; | |
585 | msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index; | |
586 | ||
587 | pthread_mutex_lock(sock->lock); | |
588 | ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg)); | |
589 | pthread_mutex_unlock(sock->lock); | |
590 | if (ret < 0) { | |
591 | PERROR("send consumer destroy relayd command"); | |
592 | goto error; | |
593 | } | |
594 | ||
595 | DBG2("Consumer send destroy relayd command done"); | |
596 | ||
597 | error: | |
598 | return ret; | |
599 | } |