2 * SPDX-License-Identifier: MIT
4 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
7 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
9 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
10 #include "logging/comp-logging.h"
12 #include <babeltrace2/babeltrace.h>
16 #include "common/assert.h"
17 #include "ctfser/ctfser.h"
18 #include "plugins/common/param-validation/param-validation.h"
20 #include "fs-sink.hpp"
21 #include "fs-sink-trace.hpp"
22 #include "fs-sink-stream.hpp"
23 #include "fs-sink-ctf-meta.hpp"
24 #include "translate-trace-ir-to-ctf-ir.hpp"
25 #include "translate-ctf-ir-to-tsdl.hpp"
28 const char * const in_port_name
= "in";
31 bt_component_class_initialize_method_status
ensure_output_dir_exists(
32 struct fs_sink_comp
*fs_sink
)
34 bt_component_class_initialize_method_status status
=
35 BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
38 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
40 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(fs_sink
->self_comp
,
41 "Cannot create directories for output directory",
42 ": output-dir-path=\"%s\"",
43 fs_sink
->output_dir_path
->str
);
44 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
52 static bt_param_validation_map_value_entry_descr fs_sink_params_descr
[] = {
53 { "path", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_MANDATORY
,
54 { bt_param_validation_value_descr::string_t
} },
55 { "assume-single-trace", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
56 { bt_param_validation_value_descr::bool_t
} },
57 { "ignore-discarded-events", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
58 { bt_param_validation_value_descr::bool_t
} },
59 { "ignore-discarded-packets", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
60 { bt_param_validation_value_descr::bool_t
} },
61 { "quiet", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL
,
62 { bt_param_validation_value_descr::bool_t
} },
63 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
67 bt_component_class_initialize_method_status
68 configure_component(struct fs_sink_comp
*fs_sink
, const bt_value
*params
)
70 bt_component_class_initialize_method_status status
;
71 const bt_value
*value
;
72 enum bt_param_validation_status validation_status
;
73 gchar
*validation_error
;
75 validation_status
= bt_param_validation_validate(params
,
76 fs_sink_params_descr
, &validation_error
);
77 if (validation_status
== BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR
) {
78 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
79 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
80 "%s", validation_error
);
82 } else if (validation_status
== BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR
) {
83 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
87 value
= bt_value_map_borrow_entry_value_const(params
, "path");
88 g_string_assign(fs_sink
->output_dir_path
,
89 bt_value_string_get(value
));
91 value
= bt_value_map_borrow_entry_value_const(params
,
92 "assume-single-trace");
94 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
97 value
= bt_value_map_borrow_entry_value_const(params
,
98 "ignore-discarded-events");
100 fs_sink
->ignore_discarded_events
=
101 (bool) bt_value_bool_get(value
);
104 value
= bt_value_map_borrow_entry_value_const(params
,
105 "ignore-discarded-packets");
107 fs_sink
->ignore_discarded_packets
=
108 (bool) bt_value_bool_get(value
);
111 value
= bt_value_map_borrow_entry_value_const(params
,
114 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
117 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
;
120 g_free(validation_error
);
125 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
131 if (fs_sink
->output_dir_path
) {
132 g_string_free(fs_sink
->output_dir_path
, TRUE
);
133 fs_sink
->output_dir_path
= NULL
;
136 if (fs_sink
->traces
) {
137 g_hash_table_destroy(fs_sink
->traces
);
138 fs_sink
->traces
= NULL
;
141 BT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
142 fs_sink
->upstream_iter
);
150 bt_component_class_initialize_method_status
ctf_fs_sink_init(
151 bt_self_component_sink
*self_comp_sink
,
152 bt_self_component_sink_configuration
*config
,
153 const bt_value
*params
,
154 void *init_method_data
)
156 bt_component_class_initialize_method_status status
;
157 bt_self_component_add_port_status add_port_status
;
158 struct fs_sink_comp
*fs_sink
= NULL
;
159 bt_self_component
*self_comp
=
160 bt_self_component_sink_as_self_component(self_comp_sink
);
161 bt_logging_level log_level
= bt_component_get_logging_level(
162 bt_self_component_as_component(self_comp
));
164 fs_sink
= g_new0(struct fs_sink_comp
, 1);
166 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
167 "Failed to allocate one CTF FS sink structure.");
168 BT_CURRENT_THREAD_ERROR_APPEND_CAUSE_FROM_COMPONENT(
169 self_comp
, "Failed to allocate one CTF FS sink structure.");
170 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
174 fs_sink
->log_level
= log_level
;
175 fs_sink
->self_comp
= self_comp
;
176 fs_sink
->output_dir_path
= g_string_new(NULL
);
177 status
= configure_component(fs_sink
, params
);
178 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
179 /* configure_component() logs errors */
183 if (fs_sink
->assume_single_trace
&&
184 g_file_test(fs_sink
->output_dir_path
->str
,
185 G_FILE_TEST_EXISTS
)) {
186 BT_COMP_LOGE_APPEND_CAUSE(self_comp
,
187 "Single trace mode, but output path exists: output-path=\"%s\"",
188 fs_sink
->output_dir_path
->str
);
189 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR
;
193 status
= ensure_output_dir_exists(fs_sink
);
194 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
195 /* ensure_output_dir_exists() logs errors */
199 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
200 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
201 if (!fs_sink
->traces
) {
202 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to allocate one GHashTable.");
203 status
= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR
;
207 add_port_status
= bt_self_component_sink_add_input_port(
208 self_comp_sink
, in_port_name
, NULL
, NULL
);
209 if (add_port_status
!= BT_SELF_COMPONENT_ADD_PORT_STATUS_OK
) {
210 status
= (bt_component_class_initialize_method_status
) add_port_status
;
211 BT_COMP_LOGE_APPEND_CAUSE(self_comp
, "Failed to add input port.");
215 bt_self_component_set_data(self_comp
, fs_sink
);
218 if (status
!= BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK
) {
219 destroy_fs_sink_comp(fs_sink
);
226 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
227 const bt_stream
*ir_stream
)
229 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
230 struct fs_sink_trace
*trace
;
231 struct fs_sink_stream
*stream
= NULL
;
233 trace
= (fs_sink_trace
*) g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
234 if (G_UNLIKELY(!trace
)) {
235 if (fs_sink
->assume_single_trace
&&
236 g_hash_table_size(fs_sink
->traces
) > 0) {
237 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
238 "Single trace mode, but getting more than one trace: "
239 "stream-name=\"%s\"",
240 bt_stream_get_name(ir_stream
));
244 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
250 stream
= (fs_sink_stream
*) g_hash_table_lookup(trace
->streams
, ir_stream
);
251 if (G_UNLIKELY(!stream
)) {
252 stream
= fs_sink_stream_create(trace
, ir_stream
);
263 bt_component_class_sink_consume_method_status
handle_event_msg(
264 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
267 bt_component_class_sink_consume_method_status status
=
268 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
269 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
270 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
271 struct fs_sink_stream
*stream
;
272 struct fs_sink_ctf_event_class
*ec
= NULL
;
273 const bt_clock_snapshot
*cs
= NULL
;
275 stream
= borrow_stream(fs_sink
, ir_stream
);
276 if (G_UNLIKELY(!stream
)) {
277 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
278 "Failed to borrow stream.");
279 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
283 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
284 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
286 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
287 "Failed to translate event class to CTF IR.");
288 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
294 if (stream
->sc
->default_clock_class
) {
295 cs
= bt_message_event_borrow_default_clock_snapshot_const(
300 * If this event's stream does not support packets, then we
301 * lazily create artificial packets.
303 * The size of an artificial packet is arbitrarily at least
304 * 4 MiB (it usually is greater because we close it when
305 * comes the time to write a new event and the packet's content
306 * size is >= 4 MiB), except the last one which can be smaller.
308 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
309 if (stream
->packet_state
.is_open
&&
310 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
313 * Stream's current packet is larger than 4 MiB:
314 * close it. A new packet will be opened just
317 ret
= fs_sink_stream_close_packet(stream
, NULL
);
319 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
320 "Failed to close packet.");
321 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
326 if (!stream
->packet_state
.is_open
) {
327 /* Stream's packet is not currently opened: open it */
328 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
330 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
331 "Failed to open packet.");
332 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
338 BT_ASSERT_DBG(stream
->packet_state
.is_open
);
339 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
340 if (G_UNLIKELY(ret
)) {
341 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
342 "Failed to write event.");
343 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
352 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
353 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
356 bt_component_class_sink_consume_method_status status
=
357 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
358 const bt_packet
*ir_packet
=
359 bt_message_packet_beginning_borrow_packet_const(msg
);
360 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
361 struct fs_sink_stream
*stream
;
362 const bt_clock_snapshot
*cs
= NULL
;
364 stream
= borrow_stream(fs_sink
, ir_stream
);
365 if (G_UNLIKELY(!stream
)) {
366 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
367 "Failed to borrow stream.");
368 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
372 if (stream
->sc
->packets_have_ts_begin
) {
373 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
379 * If we previously received a discarded events message with
380 * a time range, make sure that its beginning time matches what's
381 * expected for CTF 1.8, that is:
383 * * Its beginning time is the previous packet's end
384 * time (or the current packet's beginning time if
385 * this is the first packet).
387 * We check this here instead of in handle_packet_end_msg()
388 * because we want to catch any incompatible message as early as
389 * possible to report the error.
391 * Validation of the discarded events message's end time is
392 * performed in handle_packet_end_msg().
394 if (stream
->discarded_events_state
.in_range
) {
395 uint64_t expected_cs
;
398 * `stream->discarded_events_state.in_range` is only set
399 * when the stream class's discarded events have a time
402 * It is required that the packet beginning and end
403 * messages for this stream class have times when
404 * discarded events have a time range.
406 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
407 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
408 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
410 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
411 /* We're opening the first packet */
412 expected_cs
= bt_clock_snapshot_get_value(cs
);
414 expected_cs
= stream
->prev_packet_state
.end_cs
;
417 if (stream
->discarded_events_state
.beginning_cs
!=
419 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
420 "Incompatible discarded events message: "
421 "unexpected beginning time: "
422 "beginning-cs-val=%" PRIu64
", "
423 "expected-beginning-cs-val=%" PRIu64
", "
424 "stream-id=%" PRIu64
", stream-name=\"%s\", "
425 "trace-name=\"%s\", path=\"%s/%s\"",
426 stream
->discarded_events_state
.beginning_cs
,
428 bt_stream_get_id(ir_stream
),
429 bt_stream_get_name(ir_stream
),
431 bt_stream_borrow_trace_const(ir_stream
)),
432 stream
->trace
->path
->str
, stream
->file_name
->str
);
433 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
439 * If we previously received a discarded packets message with a
440 * time range, make sure that its beginning and end times match
441 * what's expected for CTF 1.8, that is:
443 * * Its beginning time is the previous packet's end time.
445 * * Its end time is the current packet's beginning time.
447 if (stream
->discarded_packets_state
.in_range
) {
448 uint64_t expected_end_cs
;
451 * `stream->discarded_packets_state.in_range` is only
452 * set when the stream class's discarded packets have a
455 * It is required that the packet beginning and end
456 * messages for this stream class have times when
457 * discarded packets have a time range.
459 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
460 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
461 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
464 * It is not supported to have a discarded packets
465 * message _before_ the first packet: we cannot validate
466 * that its beginning time is compatible with CTF 1.8 in
469 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
470 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
471 "Incompatible discarded packets message "
472 "occurring before the stream's first packet: "
473 "stream-id=%" PRIu64
", stream-name=\"%s\", "
474 "trace-name=\"%s\", path=\"%s/%s\"",
475 bt_stream_get_id(ir_stream
),
476 bt_stream_get_name(ir_stream
),
478 bt_stream_borrow_trace_const(ir_stream
)),
479 stream
->trace
->path
->str
, stream
->file_name
->str
);
480 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
484 if (stream
->discarded_packets_state
.beginning_cs
!=
485 stream
->prev_packet_state
.end_cs
) {
486 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
487 "Incompatible discarded packets message: "
488 "unexpected beginning time: "
489 "beginning-cs-val=%" PRIu64
", "
490 "expected-beginning-cs-val=%" PRIu64
", "
491 "stream-id=%" PRIu64
", stream-name=\"%s\", "
492 "trace-name=\"%s\", path=\"%s/%s\"",
493 stream
->discarded_packets_state
.beginning_cs
,
494 stream
->prev_packet_state
.end_cs
,
495 bt_stream_get_id(ir_stream
),
496 bt_stream_get_name(ir_stream
),
498 bt_stream_borrow_trace_const(ir_stream
)),
499 stream
->trace
->path
->str
, stream
->file_name
->str
);
500 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
504 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
506 if (stream
->discarded_packets_state
.end_cs
!=
508 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
509 "Incompatible discarded packets message: "
510 "unexpected end time: "
511 "end-cs-val=%" PRIu64
", "
512 "expected-end-cs-val=%" PRIu64
", "
513 "stream-id=%" PRIu64
", stream-name=\"%s\", "
514 "trace-name=\"%s\", path=\"%s/%s\"",
515 stream
->discarded_packets_state
.end_cs
,
517 bt_stream_get_id(ir_stream
),
518 bt_stream_get_name(ir_stream
),
520 bt_stream_borrow_trace_const(ir_stream
)),
521 stream
->trace
->path
->str
, stream
->file_name
->str
);
522 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
528 * We're not in a discarded packets time range anymore since we
529 * require that the discarded packets time ranges go from one
530 * packet's end time to the next packet's beginning time, and
531 * we're handling a packet beginning message here.
533 stream
->discarded_packets_state
.in_range
= false;
535 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
537 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
538 "Failed to open packet.");
539 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
548 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
549 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
552 bt_component_class_sink_consume_method_status status
=
553 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
554 const bt_packet
*ir_packet
=
555 bt_message_packet_end_borrow_packet_const(msg
);
556 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
557 struct fs_sink_stream
*stream
;
558 const bt_clock_snapshot
*cs
= NULL
;
560 stream
= borrow_stream(fs_sink
, ir_stream
);
561 if (G_UNLIKELY(!stream
)) {
562 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
563 "Failed to borrow stream.");
564 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
568 if (stream
->sc
->packets_have_ts_end
) {
569 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
575 * If we previously received a discarded events message with
576 * a time range, make sure that its end time matches what's
577 * expected for CTF 1.8, that is:
579 * * Its end time is the current packet's end time.
581 * Validation of the discarded events message's beginning time
582 * is performed in handle_packet_beginning_msg().
584 if (stream
->discarded_events_state
.in_range
) {
585 uint64_t expected_cs
;
588 * `stream->discarded_events_state.in_range` is only set
589 * when the stream class's discarded events have a time
592 * It is required that the packet beginning and end
593 * messages for this stream class have times when
594 * discarded events have a time range.
596 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
597 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
598 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
600 expected_cs
= bt_clock_snapshot_get_value(cs
);
602 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
603 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
604 "Incompatible discarded events message: "
605 "unexpected end time: "
606 "end-cs-val=%" PRIu64
", "
607 "expected-end-cs-val=%" PRIu64
", "
608 "stream-id=%" PRIu64
", stream-name=\"%s\", "
609 "trace-name=\"%s\", path=\"%s/%s\"",
610 stream
->discarded_events_state
.end_cs
,
612 bt_stream_get_id(ir_stream
),
613 bt_stream_get_name(ir_stream
),
615 bt_stream_borrow_trace_const(ir_stream
)),
616 stream
->trace
->path
->str
, stream
->file_name
->str
);
617 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
622 ret
= fs_sink_stream_close_packet(stream
, cs
);
624 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
625 "Failed to close packet.");
626 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
631 * We're not in a discarded events time range anymore since we
632 * require that the discarded events time ranges go from one
633 * packet's end time to the next packet's end time, and we're
634 * handling a packet end message here.
636 stream
->discarded_events_state
.in_range
= false;
643 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
644 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
646 bt_component_class_sink_consume_method_status status
=
647 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
648 const bt_stream
*ir_stream
=
649 bt_message_stream_beginning_borrow_stream_const(msg
);
650 const bt_stream_class
*ir_sc
=
651 bt_stream_borrow_class_const(ir_stream
);
652 struct fs_sink_stream
*stream
;
653 bool packets_have_beginning_end_cs
=
654 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
655 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
658 * Not supported: discarded events or discarded packets support
659 * without packets support. Packets are the way to know where
660 * discarded events/packets occurred in CTF 1.8.
662 if (!bt_stream_class_supports_packets(ir_sc
)) {
663 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
665 if (!fs_sink
->ignore_discarded_events
&&
666 bt_stream_class_supports_discarded_events(ir_sc
)) {
667 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
668 "Unsupported stream: "
669 "stream does not support packets, "
670 "but supports discarded events: "
672 "stream-id=%" PRIu64
", "
673 "stream-name=\"%s\"",
674 ir_stream
, bt_stream_get_id(ir_stream
),
675 bt_stream_get_name(ir_stream
));
676 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
682 * Not supported: discarded events with default clock snapshots,
683 * but packet beginning/end without default clock snapshot.
685 if (!fs_sink
->ignore_discarded_events
&&
686 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
687 !packets_have_beginning_end_cs
) {
688 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
689 "Unsupported stream: discarded events have "
690 "default clock snapshots, but packets have no "
691 "beginning and/or end default clock snapshots: "
693 "stream-id=%" PRIu64
", "
694 "stream-name=\"%s\"",
695 ir_stream
, bt_stream_get_id(ir_stream
),
696 bt_stream_get_name(ir_stream
));
697 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
702 * Not supported: discarded packets with default clock
703 * snapshots, but packet beginning/end without default clock
706 if (!fs_sink
->ignore_discarded_packets
&&
707 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
708 !packets_have_beginning_end_cs
) {
709 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
710 "Unsupported stream: discarded packets have "
711 "default clock snapshots, but packets have no "
712 "beginning and/or end default clock snapshots: "
714 "stream-id=%" PRIu64
", "
715 "stream-name=\"%s\"",
716 ir_stream
, bt_stream_get_id(ir_stream
),
717 bt_stream_get_name(ir_stream
));
718 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
722 stream
= borrow_stream(fs_sink
, ir_stream
);
724 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
725 "Failed to borrow stream.");
726 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
730 BT_COMP_LOGI("Created new, empty stream file: "
731 "stream-id=%" PRIu64
", stream-name=\"%s\", "
732 "trace-name=\"%s\", path=\"%s/%s\"",
733 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
734 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
735 stream
->trace
->path
->str
, stream
->file_name
->str
);
742 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
743 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
745 bt_component_class_sink_consume_method_status status
=
746 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
747 const bt_stream
*ir_stream
=
748 bt_message_stream_end_borrow_stream_const(msg
);
749 struct fs_sink_stream
*stream
;
751 stream
= borrow_stream(fs_sink
, ir_stream
);
753 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
754 "Failed to borrow stream.");
755 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
759 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
760 stream
->packet_state
.is_open
)) {
761 /* Close stream's current artificial packet */
762 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
765 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
766 "Failed to close packet.");
767 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
772 BT_COMP_LOGI("Closing stream file: "
773 "stream-id=%" PRIu64
", stream-name=\"%s\", "
774 "trace-name=\"%s\", path=\"%s/%s\"",
775 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
776 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
777 stream
->trace
->path
->str
, stream
->file_name
->str
);
780 * This destroys the stream object and frees all its resources,
781 * closing the stream file.
783 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
790 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
791 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
793 bt_component_class_sink_consume_method_status status
=
794 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
795 const bt_stream
*ir_stream
=
796 bt_message_discarded_events_borrow_stream_const(msg
);
797 struct fs_sink_stream
*stream
;
798 const bt_clock_snapshot
*cs
= NULL
;
799 bt_property_availability avail
;
802 stream
= borrow_stream(fs_sink
, ir_stream
);
804 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
805 "Failed to borrow stream.");
806 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
810 if (fs_sink
->ignore_discarded_events
) {
811 BT_COMP_LOGI("Ignoring discarded events message: "
812 "stream-id=%" PRIu64
", stream-name=\"%s\", "
813 "trace-name=\"%s\", path=\"%s/%s\"",
814 bt_stream_get_id(ir_stream
),
815 bt_stream_get_name(ir_stream
),
817 bt_stream_borrow_trace_const(ir_stream
)),
818 stream
->trace
->path
->str
, stream
->file_name
->str
);
822 if (stream
->discarded_events_state
.in_range
) {
823 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
824 "Unsupported contiguous discarded events message: "
825 "stream-id=%" PRIu64
", stream-name=\"%s\", "
826 "trace-name=\"%s\", path=\"%s/%s\"",
827 bt_stream_get_id(ir_stream
),
828 bt_stream_get_name(ir_stream
),
830 bt_stream_borrow_trace_const(ir_stream
)),
831 stream
->trace
->path
->str
, stream
->file_name
->str
);
832 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
837 * If we're currently in an opened packet (got a packet
838 * beginning message, but no packet end message yet), we do not
839 * support having a discarded events message with a time range
840 * because we require that the discarded events message's time
841 * range go from a packet's end time to the next packet's end
844 if (stream
->packet_state
.is_open
&&
845 stream
->sc
->discarded_events_has_ts
) {
846 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
847 "Unsupported discarded events message with "
848 "default clock snapshots occurring within a packet: "
849 "stream-id=%" PRIu64
", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream
),
852 bt_stream_get_name(ir_stream
),
854 bt_stream_borrow_trace_const(ir_stream
)),
855 stream
->trace
->path
->str
, stream
->file_name
->str
);
856 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
860 if (stream
->sc
->discarded_events_has_ts
) {
862 * Make the stream's state be in the time range of a
863 * discarded events message since we have the message's
864 * time range (`stream->sc->discarded_events_has_ts`).
866 stream
->discarded_events_state
.in_range
= true;
869 * The clock snapshot values will be validated when
870 * handling the next packet beginning and end messages
871 * (next calls to handle_packet_beginning_msg() and
872 * handle_packet_end_msg()).
874 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
877 stream
->discarded_events_state
.beginning_cs
=
878 bt_clock_snapshot_get_value(cs
);
879 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
882 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
885 avail
= bt_message_discarded_events_get_count(msg
, &count
);
886 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
888 * There's no specific count of discarded events: set it
889 * to 1 so that we know that we at least discarded
895 stream
->packet_state
.discarded_events_counter
+= count
;
902 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
903 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
905 bt_component_class_sink_consume_method_status status
=
906 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
907 const bt_stream
*ir_stream
=
908 bt_message_discarded_packets_borrow_stream_const(msg
);
909 struct fs_sink_stream
*stream
;
910 const bt_clock_snapshot
*cs
= NULL
;
911 bt_property_availability avail
;
914 stream
= borrow_stream(fs_sink
, ir_stream
);
916 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
917 "Failed to borrow stream.");
918 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
922 if (fs_sink
->ignore_discarded_packets
) {
923 BT_COMP_LOGI("Ignoring discarded packets message: "
924 "stream-id=%" PRIu64
", stream-name=\"%s\", "
925 "trace-name=\"%s\", path=\"%s/%s\"",
926 bt_stream_get_id(ir_stream
),
927 bt_stream_get_name(ir_stream
),
929 bt_stream_borrow_trace_const(ir_stream
)),
930 stream
->trace
->path
->str
, stream
->file_name
->str
);
934 if (stream
->discarded_packets_state
.in_range
) {
935 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
936 "Unsupported contiguous discarded packets message: "
937 "stream-id=%" PRIu64
", stream-name=\"%s\", "
938 "trace-name=\"%s\", path=\"%s/%s\"",
939 bt_stream_get_id(ir_stream
),
940 bt_stream_get_name(ir_stream
),
942 bt_stream_borrow_trace_const(ir_stream
)),
943 stream
->trace
->path
->str
, stream
->file_name
->str
);
944 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
949 * Discarded packets messages are guaranteed to occur between
952 BT_ASSERT(!stream
->packet_state
.is_open
);
954 if (stream
->sc
->discarded_packets_has_ts
) {
956 * Make the stream's state be in the time range of a
957 * discarded packets message since we have the message's
958 * time range (`stream->sc->discarded_packets_has_ts`).
960 stream
->discarded_packets_state
.in_range
= true;
963 * The clock snapshot values will be validated when
964 * handling the next packet beginning message (next call
965 * to handle_packet_beginning_msg()).
967 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
970 stream
->discarded_packets_state
.beginning_cs
=
971 bt_clock_snapshot_get_value(cs
);
972 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
975 stream
->discarded_packets_state
.end_cs
=
976 bt_clock_snapshot_get_value(cs
);
979 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
980 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
982 * There's no specific count of discarded packets: set
983 * it to 1 so that we know that we at least discarded
989 stream
->packet_state
.seq_num
+= count
;
996 void put_messages(bt_message_array_const msgs
, uint64_t count
)
1000 for (i
= 0; i
< count
; i
++) {
1001 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1006 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
1007 bt_self_component_sink
*self_comp
)
1009 bt_component_class_sink_consume_method_status status
=
1010 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
1011 struct fs_sink_comp
*fs_sink
;
1012 bt_message_iterator_next_status next_status
;
1013 uint64_t msg_count
= 0;
1014 bt_message_array_const msgs
;
1016 fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1017 bt_self_component_sink_as_self_component(self_comp
));
1018 BT_ASSERT_DBG(fs_sink
);
1019 BT_ASSERT_DBG(fs_sink
->upstream_iter
);
1021 /* Consume messages */
1022 next_status
= bt_message_iterator_next(
1023 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
1024 if (next_status
< 0) {
1025 status
= (bt_component_class_sink_consume_method_status
) next_status
;
1026 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1027 "Failed to get next message from upstream iterator.");
1031 switch (next_status
) {
1032 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
1036 for (i
= 0; i
< msg_count
; i
++) {
1037 const bt_message
*msg
= msgs
[i
];
1041 switch (bt_message_get_type(msg
)) {
1042 case BT_MESSAGE_TYPE_EVENT
:
1043 status
= handle_event_msg(fs_sink
, msg
);
1045 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1046 status
= handle_packet_beginning_msg(
1049 case BT_MESSAGE_TYPE_PACKET_END
:
1050 status
= handle_packet_end_msg(
1053 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1055 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1057 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1058 status
= handle_stream_beginning_msg(
1061 case BT_MESSAGE_TYPE_STREAM_END
:
1062 status
= handle_stream_end_msg(
1065 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1066 status
= handle_discarded_events_msg(
1069 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1070 status
= handle_discarded_packets_msg(
1077 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1079 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1080 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1081 "Failed to handle message: "
1082 "generated CTF traces could be incomplete: "
1083 "output-dir-path=\"%s\"",
1084 fs_sink
->output_dir_path
->str
);
1091 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1092 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1094 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1095 /* TODO: Finalize all traces (should already be done?) */
1096 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1105 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1106 put_messages(msgs
, msg_count
);
1113 bt_component_class_sink_graph_is_configured_method_status
1114 ctf_fs_sink_graph_is_configured(
1115 bt_self_component_sink
*self_comp
)
1117 bt_component_class_sink_graph_is_configured_method_status status
;
1118 bt_message_iterator_create_from_sink_component_status
1120 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1121 bt_self_component_sink_as_self_component(self_comp
));
1124 bt_message_iterator_create_from_sink_component(
1126 bt_self_component_sink_borrow_input_port_by_name(
1127 self_comp
, in_port_name
), &fs_sink
->upstream_iter
);
1128 if (msg_iter_status
!= BT_MESSAGE_ITERATOR_CREATE_FROM_SINK_COMPONENT_STATUS_OK
) {
1129 status
= (bt_component_class_sink_graph_is_configured_method_status
) msg_iter_status
;
1130 BT_COMP_LOGE_APPEND_CAUSE(fs_sink
->self_comp
,
1131 "Failed to create upstream iterator.");
1135 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1141 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1143 fs_sink_comp
*fs_sink
= (fs_sink_comp
*) bt_self_component_get_data(
1144 bt_self_component_sink_as_self_component(self_comp
));
1146 destroy_fs_sink_comp(fs_sink
);