2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <common/common.h>
23 #include <common/utils.h>
24 #include <common/defaults.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <urcu/rculist.h>
29 #include "lttng-relayd.h"
32 #include "viewer-stream.h"
34 #include <sys/types.h>
37 #define FILE_IO_STACK_BUFFER_SIZE 65536
39 /* Should be called with RCU read-side lock held. */
40 bool stream_get(struct relay_stream
*stream
)
42 return urcu_ref_get_unless_zero(&stream
->ref
);
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
50 struct relay_stream
*stream_get_by_id(uint64_t stream_id
)
52 struct lttng_ht_node_u64
*node
;
53 struct lttng_ht_iter iter
;
54 struct relay_stream
*stream
= NULL
;
57 lttng_ht_lookup(relay_streams_ht
, &stream_id
, &iter
);
58 node
= lttng_ht_iter_get_node_u64(&iter
);
60 DBG("Relay stream %" PRIu64
" not found", stream_id
);
63 stream
= caa_container_of(node
, struct relay_stream
, node
);
64 if (!stream_get(stream
)) {
72 static void stream_complete_rotation(struct relay_stream
*stream
)
74 DBG("Rotation completed for stream %" PRIu64
, stream
->stream_handle
);
75 lttng_trace_chunk_put(stream
->trace_chunk
);
76 stream
->trace_chunk
= stream
->ongoing_rotation
.value
.next_trace_chunk
;
77 stream
->ongoing_rotation
= (typeof(stream
->ongoing_rotation
)) {};
81 * If too much data has been written in a tracefile before we received the
82 * rotation command, we have to move the excess data to the new tracefile and
83 * perform the rotation. This can happen because the control and data
84 * connections are separate, the indexes as well as the commands arrive from
85 * the control connection and we have no control over the order so we could be
86 * in a situation where too much data has been received on the data connection
87 * before the rotation command on the control connection arrives.
89 static int rotate_truncate_stream(struct relay_stream
*stream
)
93 uint64_t diff
, pos
= 0;
94 char buf
[FILE_IO_STACK_BUFFER_SIZE
];
96 assert(!stream
->is_metadata
);
98 assert(stream
->tracefile_size_current
>
99 stream
->pos_after_last_complete_data_index
);
100 diff
= stream
->tracefile_size_current
-
101 stream
->pos_after_last_complete_data_index
;
103 /* Create the new tracefile. */
104 new_fd
= utils_create_stream_file(stream
->path_name
,
105 stream
->channel_name
,
106 stream
->tracefile_size
, stream
->tracefile_count
,
107 /* uid */ -1, /* gid */ -1, /* suffix */ NULL
);
109 ERR("Failed to create new stream file at path %s for channel %s",
110 stream
->path_name
, stream
->channel_name
);
116 * Rewind the current tracefile to the position at which the rotation
117 * should have occurred.
119 lseek_ret
= lseek(stream
->stream_fd
->fd
,
120 stream
->pos_after_last_complete_data_index
, SEEK_SET
);
122 PERROR("seek truncate stream");
127 /* Move data from the old file to the new file. */
129 uint64_t count
, bytes_left
;
132 bytes_left
= diff
- pos
;
133 count
= bytes_left
> sizeof(buf
) ? sizeof(buf
) : bytes_left
;
134 assert(count
<= SIZE_MAX
);
136 io_ret
= lttng_read(stream
->stream_fd
->fd
, buf
, count
);
137 if (io_ret
< (ssize_t
) count
) {
138 char error_string
[256];
140 snprintf(error_string
, sizeof(error_string
),
141 "Failed to read %" PRIu64
" bytes from fd %i in rotate_truncate_stream(), returned %zi",
142 count
, stream
->stream_fd
->fd
, io_ret
);
144 PERROR("%s", error_string
);
146 ERR("%s", error_string
);
152 io_ret
= lttng_write(new_fd
, buf
, count
);
153 if (io_ret
< (ssize_t
) count
) {
154 char error_string
[256];
156 snprintf(error_string
, sizeof(error_string
),
157 "Failed to write %" PRIu64
" bytes from fd %i in rotate_truncate_stream(), returned %zi",
158 count
, new_fd
, io_ret
);
160 PERROR("%s", error_string
);
162 ERR("%s", error_string
);
171 /* Truncate the file to get rid of the excess data. */
172 ret
= ftruncate(stream
->stream_fd
->fd
,
173 stream
->pos_after_last_complete_data_index
);
179 ret
= close(stream
->stream_fd
->fd
);
181 PERROR("Closing tracefile");
186 * Update the offset and FD of all the eventual indexes created by the
187 * data connection before the rotation command arrived.
189 ret
= relay_index_switch_all_files(stream
);
191 ERR("Failed to rotate index file");
195 stream
->stream_fd
->fd
= new_fd
;
196 stream
->tracefile_size_current
= diff
;
197 stream
->pos_after_last_complete_data_index
= 0;
198 stream_complete_rotation(stream
);
206 static int stream_create_data_output_file_from_trace_chunk(
207 struct relay_stream
*stream
,
208 struct lttng_trace_chunk
*trace_chunk
,
210 struct stream_fd
**out_stream_fd
)
213 char stream_path
[LTTNG_PATH_MAX
];
214 enum lttng_trace_chunk_status status
;
215 const int flags
= O_RDWR
| O_CREAT
| O_TRUNC
;
216 const mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
;
218 ASSERT_LOCKED(stream
->lock
);
219 assert(stream
->trace_chunk
);
221 ret
= utils_stream_file_path(stream
->path_name
, stream
->channel_name
,
222 stream
->tracefile_size
, stream
->tracefile_current_index
,
223 NULL
, stream_path
, sizeof(stream_path
));
228 if (stream
->tracefile_wrapped_around
|| force_unlink
) {
230 * The on-disk ring-buffer has wrapped around.
231 * Newly created stream files will replace existing files. Since
232 * live clients may be consuming existing files, the file about
233 * to be replaced is unlinked in order to not overwrite its
236 status
= lttng_trace_chunk_unlink_file(trace_chunk
,
238 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
239 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
242 * Don't abort if the file doesn't exist, it is
243 * unexpected, but should not be a fatal error.
245 if (errno
!= ENOENT
) {
252 status
= lttng_trace_chunk_open_file(
253 trace_chunk
, stream_path
, flags
, mode
, &fd
);
254 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
255 ERR("Failed to open stream file \"%s\"", stream
->channel_name
);
260 *out_stream_fd
= stream_fd_create(fd
);
261 if (!*out_stream_fd
) {
263 PERROR("Error closing stream file descriptor %d", ret
);
272 static int stream_rotate_data_file(struct relay_stream
*stream
)
276 DBG("Rotating stream %" PRIu64
" data file",
277 stream
->stream_handle
);
279 if (stream
->stream_fd
) {
280 stream_fd_put(stream
->stream_fd
);
281 stream
->stream_fd
= NULL
;
284 stream
->tracefile_wrapped_around
= false;
285 stream
->tracefile_current_index
= 0;
287 if (stream
->ongoing_rotation
.value
.next_trace_chunk
) {
288 struct stream_fd
*new_stream_fd
= NULL
;
289 enum lttng_trace_chunk_status chunk_status
;
291 chunk_status
= lttng_trace_chunk_create_subdirectory(
292 stream
->ongoing_rotation
.value
.next_trace_chunk
,
294 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
299 /* Rotate the data file. */
300 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
301 stream
->ongoing_rotation
.value
.next_trace_chunk
,
302 false, &new_stream_fd
);
303 stream
->stream_fd
= new_stream_fd
;
305 ERR("Failed to rotate stream data file");
309 stream
->tracefile_size_current
= 0;
310 stream
->pos_after_last_complete_data_index
= 0;
311 stream
->ongoing_rotation
.value
.data_rotated
= true;
313 if (stream
->ongoing_rotation
.value
.index_rotated
) {
314 /* Rotation completed; reset its state. */
315 stream_complete_rotation(stream
);
322 * Check if a stream's data file (as opposed to index) should be rotated
323 * (for session rotation).
324 * Must be called with the stream lock held.
326 * Return 0 on success, a negative value on error.
328 static int try_rotate_stream_data(struct relay_stream
*stream
)
332 if (caa_likely(!stream
->ongoing_rotation
.is_set
)) {
333 /* No rotation expected. */
337 if (stream
->ongoing_rotation
.value
.data_rotated
) {
338 /* Rotation of the data file has already occurred. */
342 if (stream
->prev_data_seq
== -1ULL ||
343 stream
->prev_data_seq
+ 1 < stream
->ongoing_rotation
.value
.seq_num
) {
345 * The next packet that will be written is not part of the next
348 DBG("Stream %" PRIu64
" not yet ready for rotation (rotate_at_seq_num = %" PRIu64
349 ", prev_data_seq = %" PRIu64
")",
350 stream
->stream_handle
,
351 stream
->ongoing_rotation
.value
.seq_num
,
352 stream
->prev_data_seq
);
354 } else if (stream
->prev_data_seq
> stream
->ongoing_rotation
.value
.seq_num
) {
356 * prev_data_seq is checked here since indexes and rotation
357 * commands are serialized with respect to each other.
359 DBG("Rotation after too much data has been written in tracefile "
360 "for stream %" PRIu64
", need to truncate before "
361 "rotating", stream
->stream_handle
);
362 ret
= rotate_truncate_stream(stream
);
364 ERR("Failed to truncate stream");
368 ret
= stream_rotate_data_file(stream
);
376 * Close the current index file if it is open, and create a new one.
378 * Return 0 on success, -1 on error.
380 static int create_index_file(struct relay_stream
*stream
,
381 struct lttng_trace_chunk
*chunk
)
384 uint32_t major
, minor
;
385 char *index_subpath
= NULL
;
387 ASSERT_LOCKED(stream
->lock
);
389 /* Put ref on previous index_file. */
390 if (stream
->index_file
) {
391 lttng_index_file_put(stream
->index_file
);
392 stream
->index_file
= NULL
;
394 major
= stream
->trace
->session
->major
;
395 minor
= stream
->trace
->session
->minor
;
401 ret
= asprintf(&index_subpath
, "%s/%s", stream
->path_name
,
407 ret
= lttng_trace_chunk_create_subdirectory(chunk
,
413 stream
->index_file
= lttng_index_file_create_from_trace_chunk(
414 chunk
, stream
->path_name
,
415 stream
->channel_name
, stream
->tracefile_size
,
416 stream
->tracefile_current_index
,
417 lttng_to_index_major(major
, minor
),
418 lttng_to_index_minor(major
, minor
), true);
419 if (!stream
->index_file
) {
431 * Check if a stream's index file should be rotated (for session rotation).
432 * Must be called with the stream lock held.
434 * Return 0 on success, a negative value on error.
436 static int try_rotate_stream_index(struct relay_stream
*stream
)
440 if (!stream
->ongoing_rotation
.is_set
) {
441 /* No rotation expected. */
445 if (stream
->ongoing_rotation
.value
.index_rotated
) {
446 /* Rotation of the index has already occurred. */
450 if (stream
->prev_index_seq
== -1ULL ||
451 stream
->prev_index_seq
+ 1 < stream
->ongoing_rotation
.value
.seq_num
) {
452 DBG("Stream %" PRIu64
" index not yet ready for rotation (rotate_at_seq_num = %" PRIu64
", prev_index_seq = %" PRIu64
")",
453 stream
->stream_handle
,
454 stream
->ongoing_rotation
.value
.seq_num
,
455 stream
->prev_index_seq
);
458 /* The next index belongs to the new trace chunk; rotate. */
459 assert(stream
->prev_index_seq
+ 1 ==
460 stream
->ongoing_rotation
.value
.seq_num
);
461 DBG("Rotating stream %" PRIu64
" index file",
462 stream
->stream_handle
);
463 ret
= create_index_file(stream
,
464 stream
->ongoing_rotation
.value
.next_trace_chunk
);
465 stream
->ongoing_rotation
.value
.index_rotated
= true;
467 if (stream
->ongoing_rotation
.value
.data_rotated
&&
468 stream
->ongoing_rotation
.value
.index_rotated
) {
469 /* Rotation completed; reset its state. */
470 DBG("Rotation completed for stream %" PRIu64
,
471 stream
->stream_handle
);
472 stream_complete_rotation(stream
);
480 static int stream_set_trace_chunk(struct relay_stream
*stream
,
481 struct lttng_trace_chunk
*chunk
)
484 enum lttng_trace_chunk_status status
;
485 bool acquired_reference
;
486 struct stream_fd
*new_stream_fd
= NULL
;
488 status
= lttng_trace_chunk_create_subdirectory(chunk
,
490 if (status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
495 lttng_trace_chunk_put(stream
->trace_chunk
);
496 acquired_reference
= lttng_trace_chunk_get(chunk
);
497 assert(acquired_reference
);
498 stream
->trace_chunk
= chunk
;
500 if (stream
->stream_fd
) {
501 stream_fd_put(stream
->stream_fd
);
502 stream
->stream_fd
= NULL
;
504 ret
= stream_create_data_output_file_from_trace_chunk(stream
, chunk
,
505 false, &new_stream_fd
);
506 stream
->stream_fd
= new_stream_fd
;
512 * We keep ownership of path_name and channel_name.
514 struct relay_stream
*stream_create(struct ctf_trace
*trace
,
515 uint64_t stream_handle
, char *path_name
,
516 char *channel_name
, uint64_t tracefile_size
,
517 uint64_t tracefile_count
)
520 struct relay_stream
*stream
= NULL
;
521 struct relay_session
*session
= trace
->session
;
522 bool acquired_reference
= false;
523 struct lttng_trace_chunk
*current_trace_chunk
;
525 stream
= zmalloc(sizeof(struct relay_stream
));
526 if (stream
== NULL
) {
527 PERROR("relay stream zmalloc");
531 stream
->stream_handle
= stream_handle
;
532 stream
->prev_data_seq
= -1ULL;
533 stream
->prev_index_seq
= -1ULL;
534 stream
->last_net_seq_num
= -1ULL;
535 stream
->ctf_stream_id
= -1ULL;
536 stream
->tracefile_size
= tracefile_size
;
537 stream
->tracefile_count
= tracefile_count
;
538 stream
->path_name
= path_name
;
539 stream
->channel_name
= channel_name
;
540 stream
->beacon_ts_end
= -1ULL;
541 lttng_ht_node_init_u64(&stream
->node
, stream
->stream_handle
);
542 pthread_mutex_init(&stream
->lock
, NULL
);
543 urcu_ref_init(&stream
->ref
);
544 ctf_trace_get(trace
);
545 stream
->trace
= trace
;
547 pthread_mutex_lock(&trace
->session
->lock
);
548 current_trace_chunk
= trace
->session
->current_trace_chunk
;
549 if (current_trace_chunk
) {
550 acquired_reference
= lttng_trace_chunk_get(current_trace_chunk
);
552 pthread_mutex_unlock(&trace
->session
->lock
);
553 if (!acquired_reference
) {
554 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
560 stream
->indexes_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
561 if (!stream
->indexes_ht
) {
562 ERR("Cannot created indexes_ht");
567 pthread_mutex_lock(&stream
->lock
);
568 ret
= stream_set_trace_chunk(stream
, current_trace_chunk
);
569 pthread_mutex_unlock(&stream
->lock
);
571 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
572 trace
->session
->session_name
,
573 stream
->channel_name
);
577 stream
->tfa
= tracefile_array_create(stream
->tracefile_count
);
583 stream
->is_metadata
= !strcmp(stream
->channel_name
,
584 DEFAULT_METADATA_NAME
);
585 stream
->in_recv_list
= true;
588 * Add the stream in the recv list of the session. Once the end stream
589 * message is received, all session streams are published.
591 pthread_mutex_lock(&session
->recv_list_lock
);
592 cds_list_add_rcu(&stream
->recv_node
, &session
->recv_list
);
593 session
->stream_count
++;
594 pthread_mutex_unlock(&session
->recv_list_lock
);
597 * Both in the ctf_trace object and the global stream ht since the data
598 * side of the relayd does not have the concept of session.
600 lttng_ht_add_unique_u64(relay_streams_ht
, &stream
->node
);
601 stream
->in_stream_ht
= true;
603 DBG("Relay new stream added %s with ID %" PRIu64
, stream
->channel_name
,
604 stream
->stream_handle
);
609 if (stream
->stream_fd
) {
610 stream_fd_put(stream
->stream_fd
);
611 stream
->stream_fd
= NULL
;
616 lttng_trace_chunk_put(current_trace_chunk
);
621 * path_name and channel_name need to be freed explicitly here
622 * because we cannot rely on stream_put().
630 * Called with the session lock held.
632 void stream_publish(struct relay_stream
*stream
)
634 struct relay_session
*session
;
636 pthread_mutex_lock(&stream
->lock
);
637 if (stream
->published
) {
641 session
= stream
->trace
->session
;
643 pthread_mutex_lock(&session
->recv_list_lock
);
644 if (stream
->in_recv_list
) {
645 cds_list_del_rcu(&stream
->recv_node
);
646 stream
->in_recv_list
= false;
648 pthread_mutex_unlock(&session
->recv_list_lock
);
650 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
651 cds_list_add_rcu(&stream
->stream_node
, &stream
->trace
->stream_list
);
652 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
654 stream
->published
= true;
656 pthread_mutex_unlock(&stream
->lock
);
660 * Stream must be protected by holding the stream lock or by virtue of being
661 * called from stream_destroy.
663 static void stream_unpublish(struct relay_stream
*stream
)
665 if (stream
->in_stream_ht
) {
666 struct lttng_ht_iter iter
;
669 iter
.iter
.node
= &stream
->node
.node
;
670 ret
= lttng_ht_del(relay_streams_ht
, &iter
);
672 stream
->in_stream_ht
= false;
674 if (stream
->published
) {
675 pthread_mutex_lock(&stream
->trace
->stream_list_lock
);
676 cds_list_del_rcu(&stream
->stream_node
);
677 pthread_mutex_unlock(&stream
->trace
->stream_list_lock
);
678 stream
->published
= false;
682 static void stream_destroy(struct relay_stream
*stream
)
684 if (stream
->indexes_ht
) {
686 * Calling lttng_ht_destroy in call_rcu worker thread so
687 * we don't hold the RCU read-side lock while calling
690 lttng_ht_destroy(stream
->indexes_ht
);
693 tracefile_array_destroy(stream
->tfa
);
695 free(stream
->path_name
);
696 free(stream
->channel_name
);
700 static void stream_destroy_rcu(struct rcu_head
*rcu_head
)
702 struct relay_stream
*stream
=
703 caa_container_of(rcu_head
, struct relay_stream
, rcu_node
);
705 stream_destroy(stream
);
709 * No need to take stream->lock since this is only called on the final
710 * stream_put which ensures that a single thread may act on the stream.
712 static void stream_release(struct urcu_ref
*ref
)
714 struct relay_stream
*stream
=
715 caa_container_of(ref
, struct relay_stream
, ref
);
716 struct relay_session
*session
;
718 session
= stream
->trace
->session
;
720 DBG("Releasing stream id %" PRIu64
, stream
->stream_handle
);
722 pthread_mutex_lock(&session
->recv_list_lock
);
723 session
->stream_count
--;
724 if (stream
->in_recv_list
) {
725 cds_list_del_rcu(&stream
->recv_node
);
726 stream
->in_recv_list
= false;
728 pthread_mutex_unlock(&session
->recv_list_lock
);
730 stream_unpublish(stream
);
732 if (stream
->stream_fd
) {
733 stream_fd_put(stream
->stream_fd
);
734 stream
->stream_fd
= NULL
;
736 if (stream
->index_file
) {
737 lttng_index_file_put(stream
->index_file
);
738 stream
->index_file
= NULL
;
741 ctf_trace_put(stream
->trace
);
742 stream
->trace
= NULL
;
744 stream_complete_rotation(stream
);
745 lttng_trace_chunk_put(stream
->trace_chunk
);
746 stream
->trace_chunk
= NULL
;
748 call_rcu(&stream
->rcu_node
, stream_destroy_rcu
);
751 void stream_put(struct relay_stream
*stream
)
754 assert(stream
->ref
.refcount
!= 0);
756 * Wait until we have processed all the stream packets before
757 * actually putting our last stream reference.
759 urcu_ref_put(&stream
->ref
, stream_release
);
763 int stream_set_pending_rotation(struct relay_stream
*stream
,
764 struct lttng_trace_chunk
*next_trace_chunk
,
765 uint64_t rotation_sequence_number
)
768 const struct relay_stream_rotation rotation
= {
769 .seq_num
= rotation_sequence_number
,
770 .next_trace_chunk
= next_trace_chunk
,
773 if (stream
->ongoing_rotation
.is_set
) {
774 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
779 if (next_trace_chunk
) {
780 const bool reference_acquired
=
781 lttng_trace_chunk_get(next_trace_chunk
);
783 assert(reference_acquired
);
785 LTTNG_OPTIONAL_SET(&stream
->ongoing_rotation
, rotation
);
787 DBG("Setting pending rotation: stream_id = %" PRIu64
", rotation_seq_num = %" PRIu64
,
788 stream
->stream_handle
, rotation_sequence_number
);
789 if (stream
->is_metadata
) {
791 * A metadata stream has no index; consider it already rotated.
793 stream
->ongoing_rotation
.value
.index_rotated
= true;
794 ret
= stream_rotate_data_file(stream
);
796 ret
= try_rotate_stream_data(stream
);
801 ret
= try_rotate_stream_index(stream
);
810 void try_stream_close(struct relay_stream
*stream
)
812 bool session_aborted
;
813 struct relay_session
*session
= stream
->trace
->session
;
815 DBG("Trying to close stream %" PRIu64
, stream
->stream_handle
);
817 pthread_mutex_lock(&session
->lock
);
818 session_aborted
= session
->aborted
;
819 pthread_mutex_unlock(&session
->lock
);
821 pthread_mutex_lock(&stream
->lock
);
823 * Can be called concurently by connection close and reception of last
826 if (stream
->closed
) {
827 pthread_mutex_unlock(&stream
->lock
);
828 DBG("closing stream %" PRIu64
" aborted since it is already marked as closed", stream
->stream_handle
);
832 stream
->close_requested
= true;
834 if (stream
->last_net_seq_num
== -1ULL) {
836 * Handle connection close without explicit stream close
839 * We can be clever about indexes partially received in
840 * cases where we received the data socket part, but not
841 * the control socket part: since we're currently closing
842 * the stream on behalf of the control socket, we *know*
843 * there won't be any more control information for this
844 * socket. Therefore, we can destroy all indexes for
845 * which we have received only the file descriptor (from
846 * data socket). This takes care of consumerd crashes
847 * between sending the data and control information for
848 * a packet. Since those are sent in that order, we take
849 * care of consumerd crashes.
851 DBG("relay_index_close_partial_fd");
852 relay_index_close_partial_fd(stream
);
854 * Use the highest net_seq_num we currently have pending
855 * As end of stream indicator. Leave last_net_seq_num
856 * at -1ULL if we cannot find any index.
858 stream
->last_net_seq_num
= relay_index_find_last(stream
);
859 DBG("Updating stream->last_net_seq_num to %" PRIu64
, stream
->last_net_seq_num
);
860 /* Fall-through into the next check. */
863 if (stream
->last_net_seq_num
!= -1ULL &&
864 ((int64_t) (stream
->prev_data_seq
- stream
->last_net_seq_num
)) < 0
865 && !session_aborted
) {
867 * Don't close since we still have data pending. This
868 * handles cases where an explicit close command has
869 * been received for this stream, and cases where the
870 * connection has been closed, and we are awaiting for
871 * index information from the data socket. It is
872 * therefore expected that all the index fd information
873 * we need has already been received on the control
874 * socket. Matching index information from data socket
875 * should be Expected Soon(TM).
877 * TODO: We should implement a timer to garbage collect
878 * streams after a timeout to be resilient against a
879 * consumerd implementation that would not match this
882 pthread_mutex_unlock(&stream
->lock
);
883 DBG("closing stream %" PRIu64
" aborted since it still has data pending", stream
->stream_handle
);
887 * We received all the indexes we can expect.
889 stream_unpublish(stream
);
890 stream
->closed
= true;
891 /* Relay indexes are only used by the "consumer/sessiond" end. */
892 relay_index_close_all(stream
);
893 pthread_mutex_unlock(&stream
->lock
);
894 DBG("Succeeded in closing stream %" PRIu64
, stream
->stream_handle
);
898 int stream_init_packet(struct relay_stream
*stream
, size_t packet_size
,
903 ASSERT_LOCKED(stream
->lock
);
904 if (caa_likely(stream
->tracefile_size
== 0)) {
905 /* No size limit set; nothing to check. */
910 * Check if writing the new packet would exceed the maximal file size.
912 if (caa_unlikely((stream
->tracefile_size_current
+ packet_size
) >
913 stream
->tracefile_size
)) {
914 const uint64_t new_file_index
=
915 (stream
->tracefile_current_index
+ 1) %
916 stream
->tracefile_count
;
918 if (new_file_index
< stream
->tracefile_current_index
) {
919 stream
->tracefile_wrapped_around
= true;
921 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
922 ", current_file_size = %" PRIu64
923 ", packet_size = %" PRIu64
", current_file_index = %" PRIu64
924 " new_file_index = %" PRIu64
,
925 stream
->stream_handle
,
926 stream
->tracefile_size_current
, packet_size
,
927 stream
->tracefile_current_index
, new_file_index
);
928 tracefile_array_file_rotate(stream
->tfa
);
929 stream
->tracefile_current_index
= new_file_index
;
931 if (stream
->stream_fd
) {
932 stream_fd_put(stream
->stream_fd
);
933 stream
->stream_fd
= NULL
;
935 ret
= stream_create_data_output_file_from_trace_chunk(stream
,
936 stream
->trace_chunk
, false, &stream
->stream_fd
);
938 ERR("Failed to perform trace file rotation of stream %" PRIu64
,
939 stream
->stream_handle
);
944 * Reset current size because we just performed a stream
947 stream
->tracefile_size_current
= 0;
948 *file_rotated
= true;
950 *file_rotated
= false;
956 /* Note that the packet is not necessarily complete. */
957 int stream_write(struct relay_stream
*stream
,
958 const struct lttng_buffer_view
*packet
, size_t padding_len
)
962 size_t padding_to_write
= padding_len
;
963 char padding_buffer
[FILE_IO_STACK_BUFFER_SIZE
];
965 ASSERT_LOCKED(stream
->lock
);
966 memset(padding_buffer
, 0,
967 min(sizeof(padding_buffer
), padding_to_write
));
970 write_ret
= lttng_write(stream
->stream_fd
->fd
,
971 packet
->data
, packet
->size
);
972 if (write_ret
!= packet
->size
) {
973 PERROR("Failed to write to stream file of %sstream %" PRIu64
,
974 stream
->is_metadata
? "metadata " : "",
975 stream
->stream_handle
);
981 while (padding_to_write
> 0) {
982 const size_t padding_to_write_this_pass
=
983 min(padding_to_write
, sizeof(padding_buffer
));
985 write_ret
= lttng_write(stream
->stream_fd
->fd
,
986 padding_buffer
, padding_to_write_this_pass
);
987 if (write_ret
!= padding_to_write_this_pass
) {
988 PERROR("Failed to write padding to file of %sstream %" PRIu64
,
989 stream
->is_metadata
? "metadata " : "",
990 stream
->stream_handle
);
994 padding_to_write
-= padding_to_write_this_pass
;
997 if (stream
->is_metadata
) {
998 stream
->metadata_received
+= packet
->size
+ padding_len
;
1001 DBG("Wrote to %sstream %" PRIu64
": data_length = %" PRIu64
", padding_length = %" PRIu64
,
1002 stream
->is_metadata
? "metadata " : "",
1003 stream
->stream_handle
,
1004 packet
? packet
->size
: 0, padding_len
);
1010 * Update index after receiving a packet for a data stream.
1012 * Called with the stream lock held.
1014 * Return 0 on success else a negative value.
1016 int stream_update_index(struct relay_stream
*stream
, uint64_t net_seq_num
,
1017 bool rotate_index
, bool *flushed
, uint64_t total_size
)
1020 uint64_t data_offset
;
1021 struct relay_index
*index
;
1023 ASSERT_LOCKED(stream
->lock
);
1024 /* Get data offset because we are about to update the index. */
1025 data_offset
= htobe64(stream
->tracefile_size_current
);
1027 DBG("handle_index_data: stream %" PRIu64
" net_seq_num %" PRIu64
" data offset %" PRIu64
,
1028 stream
->stream_handle
, net_seq_num
, stream
->tracefile_size_current
);
1031 * Lookup for an existing index for that stream id/sequence
1032 * number. If it exists, the control thread has already received the
1033 * data for it, thus we need to write it to disk.
1035 index
= relay_index_get_by_id_or_create(stream
, net_seq_num
);
1041 if (rotate_index
|| !stream
->index_file
) {
1042 ret
= create_index_file(stream
, stream
->trace_chunk
);
1044 ERR("Failed to create index file for stream %" PRIu64
,
1045 stream
->stream_handle
);
1046 /* Put self-ref for this index due to error. */
1047 relay_index_put(index
);
1053 if (relay_index_set_file(index
, stream
->index_file
, data_offset
)) {
1055 /* Put self-ref for this index due to error. */
1056 relay_index_put(index
);
1061 ret
= relay_index_try_flush(index
);
1063 tracefile_array_commit_seq(stream
->tfa
);
1064 stream
->index_received_seqcount
++;
1066 } else if (ret
> 0) {
1067 index
->total_size
= total_size
;
1074 * relay_index_try_flush is responsible for the self-reference
1075 * put of the index object on error.
1077 ERR("relay_index_try_flush error %d", ret
);
1084 int stream_complete_packet(struct relay_stream
*stream
, size_t packet_total_size
,
1085 uint64_t sequence_number
, bool index_flushed
)
1089 ASSERT_LOCKED(stream
->lock
);
1091 stream
->tracefile_size_current
+= packet_total_size
;
1092 if (index_flushed
) {
1093 stream
->pos_after_last_complete_data_index
=
1094 stream
->tracefile_size_current
;
1095 stream
->prev_index_seq
= sequence_number
;
1096 ret
= try_rotate_stream_index(stream
);
1102 stream
->prev_data_seq
= sequence_number
;
1103 ret
= try_rotate_stream_data(stream
);
1111 int stream_add_index(struct relay_stream
*stream
,
1112 const struct lttcomm_relayd_index
*index_info
)
1115 struct relay_index
*index
;
1117 ASSERT_LOCKED(stream
->lock
);
1119 /* Live beacon handling */
1120 if (index_info
->packet_size
== 0) {
1121 DBG("Received live beacon for stream %" PRIu64
,
1122 stream
->stream_handle
);
1125 * Only flag a stream inactive when it has already
1126 * received data and no indexes are in flight.
1128 if (stream
->index_received_seqcount
> 0
1129 && stream
->indexes_in_flight
== 0) {
1130 stream
->beacon_ts_end
= index_info
->timestamp_end
;
1135 stream
->beacon_ts_end
= -1ULL;
1138 if (stream
->ctf_stream_id
== -1ULL) {
1139 stream
->ctf_stream_id
= index_info
->stream_id
;
1142 index
= relay_index_get_by_id_or_create(stream
, index_info
->net_seq_num
);
1145 ERR("Failed to get or create index %" PRIu64
,
1146 index_info
->net_seq_num
);
1149 if (relay_index_set_control_data(index
, index_info
,
1150 stream
->trace
->session
->minor
)) {
1151 ERR("set_index_control_data error");
1152 relay_index_put(index
);
1156 ret
= relay_index_try_flush(index
);
1158 tracefile_array_commit_seq(stream
->tfa
);
1159 stream
->index_received_seqcount
++;
1160 stream
->pos_after_last_complete_data_index
+= index
->total_size
;
1161 stream
->prev_index_seq
= index_info
->net_seq_num
;
1163 ret
= try_rotate_stream_index(stream
);
1167 } else if (ret
> 0) {
1174 * relay_index_try_flush is responsible for the self-reference
1175 * put of the index object on error.
1177 ERR("relay_index_try_flush error %d", ret
);
1184 static void print_stream_indexes(struct relay_stream
*stream
)
1186 struct lttng_ht_iter iter
;
1187 struct relay_index
*index
;
1190 cds_lfht_for_each_entry(stream
->indexes_ht
->ht
, &iter
.iter
, index
,
1192 DBG("index %p net_seq_num %" PRIu64
" refcount %ld"
1193 " stream %" PRIu64
" trace %" PRIu64
1194 " session %" PRIu64
,
1197 stream
->ref
.refcount
,
1198 index
->stream
->stream_handle
,
1199 index
->stream
->trace
->id
,
1200 index
->stream
->trace
->session
->id
);
1205 int stream_reset_file(struct relay_stream
*stream
)
1207 ASSERT_LOCKED(stream
->lock
);
1209 if (stream
->stream_fd
) {
1210 stream_fd_put(stream
->stream_fd
);
1211 stream
->stream_fd
= NULL
;
1214 stream
->tracefile_size_current
= 0;
1215 stream
->prev_data_seq
= 0;
1216 stream
->prev_index_seq
= 0;
1217 /* Note that this does not reset the tracefile array. */
1218 stream
->tracefile_current_index
= 0;
1219 stream
->pos_after_last_complete_data_index
= 0;
1221 return stream_create_data_output_file_from_trace_chunk(stream
,
1222 stream
->trace_chunk
, true, &stream
->stream_fd
);
1225 void print_relay_streams(void)
1227 struct lttng_ht_iter iter
;
1228 struct relay_stream
*stream
;
1230 if (!relay_streams_ht
) {
1235 cds_lfht_for_each_entry(relay_streams_ht
->ht
, &iter
.iter
, stream
,
1237 if (!stream_get(stream
)) {
1240 DBG("stream %p refcount %ld stream %" PRIu64
" trace %" PRIu64
1241 " session %" PRIu64
,
1243 stream
->ref
.refcount
,
1244 stream
->stream_handle
,
1246 stream
->trace
->session
->id
);
1247 print_stream_indexes(stream
);