2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "plugins/comp-logging.h"
28 #include <babeltrace2/babeltrace.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
36 #include "fs-sink-trace.h"
37 #include "fs-sink-stream.h"
38 #include "fs-sink-ctf-meta.h"
39 #include "translate-trace-ir-to-ctf-ir.h"
40 #include "translate-ctf-ir-to-tsdl.h"
43 const char * const in_port_name
= "in";
46 bt_component_class_init_method_status
ensure_output_dir_exists(
47 struct fs_sink_comp
*fs_sink
)
49 bt_component_class_init_method_status status
=
50 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
53 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
56 "Cannot create directories for output directory",
57 ": output-dir-path=\"%s\"",
58 fs_sink
->output_dir_path
->str
);
59 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
68 bt_component_class_init_method_status
69 configure_component(struct fs_sink_comp
*fs_sink
,
70 const bt_value
*params
)
72 bt_component_class_init_method_status status
=
73 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
74 const bt_value
*value
;
76 value
= bt_value_map_borrow_entry_value_const(params
, "path");
78 BT_COMP_LOGE_STR("Missing mandatory `path` parameter.");
79 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
83 if (!bt_value_is_string(value
)) {
84 BT_COMP_LOGE_STR("`path` parameter: expecting a string.");
85 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
89 g_string_assign(fs_sink
->output_dir_path
,
90 bt_value_string_get(value
));
91 value
= bt_value_map_borrow_entry_value_const(params
,
92 "assume-single-trace");
94 if (!bt_value_is_bool(value
)) {
95 BT_COMP_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
96 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
100 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
103 value
= bt_value_map_borrow_entry_value_const(params
,
104 "ignore-discarded-events");
106 if (!bt_value_is_bool(value
)) {
107 BT_COMP_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
108 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
112 fs_sink
->ignore_discarded_events
=
113 (bool) bt_value_bool_get(value
);
116 value
= bt_value_map_borrow_entry_value_const(params
,
117 "ignore-discarded-packets");
119 if (!bt_value_is_bool(value
)) {
120 BT_COMP_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
121 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
125 fs_sink
->ignore_discarded_packets
=
126 (bool) bt_value_bool_get(value
);
129 value
= bt_value_map_borrow_entry_value_const(params
,
132 if (!bt_value_is_bool(value
)) {
133 BT_COMP_LOGE_STR("`quiet` parameter: expecting a boolean.");
134 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
138 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
146 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
152 if (fs_sink
->output_dir_path
) {
153 g_string_free(fs_sink
->output_dir_path
, TRUE
);
154 fs_sink
->output_dir_path
= NULL
;
157 if (fs_sink
->traces
) {
158 g_hash_table_destroy(fs_sink
->traces
);
159 fs_sink
->traces
= NULL
;
162 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
163 fs_sink
->upstream_iter
);
171 bt_component_class_init_method_status
ctf_fs_sink_init(
172 bt_self_component_sink
*self_comp_sink
, const bt_value
*params
,
173 void *init_method_data
)
175 bt_component_class_init_method_status status
=
176 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
177 bt_self_component_add_port_status add_port_status
;
178 struct fs_sink_comp
*fs_sink
= NULL
;
179 bt_self_component
*self_comp
=
180 bt_self_component_sink_as_self_component(self_comp_sink
);
181 bt_logging_level log_level
= bt_component_get_logging_level(
182 bt_self_component_as_component(self_comp
));
184 fs_sink
= g_new0(struct fs_sink_comp
, 1);
186 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
187 "Failed to allocate one CTF FS sink structure.");
188 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
192 fs_sink
->log_level
= log_level
;
193 fs_sink
->self_comp
= self_comp
;
194 fs_sink
->output_dir_path
= g_string_new(NULL
);
195 status
= configure_component(fs_sink
, params
);
196 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
197 /* configure_component() logs errors */
201 if (fs_sink
->assume_single_trace
&&
202 g_file_test(fs_sink
->output_dir_path
->str
,
203 G_FILE_TEST_EXISTS
)) {
204 BT_COMP_LOGE("Single trace mode, but output path exists: "
205 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
206 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
210 status
= ensure_output_dir_exists(fs_sink
);
211 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
212 /* ensure_output_dir_exists() logs errors */
216 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
217 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
218 if (!fs_sink
->traces
) {
219 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
220 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
224 add_port_status
= bt_self_component_sink_add_input_port(
225 self_comp_sink
, in_port_name
, NULL
, NULL
);
226 switch (add_port_status
) {
227 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
228 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
230 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
231 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
237 bt_self_component_set_data(self_comp
, fs_sink
);
240 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
241 destroy_fs_sink_comp(fs_sink
);
248 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
249 const bt_stream
*ir_stream
)
251 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
252 struct fs_sink_trace
*trace
;
253 struct fs_sink_stream
*stream
= NULL
;
255 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
256 if (G_UNLIKELY(!trace
)) {
257 if (fs_sink
->assume_single_trace
&&
258 g_hash_table_size(fs_sink
->traces
) > 0) {
259 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
260 "stream-name=\"%s\"",
261 bt_stream_get_name(ir_stream
));
265 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
271 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
272 if (G_UNLIKELY(!stream
)) {
273 stream
= fs_sink_stream_create(trace
, ir_stream
);
284 bt_component_class_sink_consume_method_status
handle_event_msg(
285 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
288 bt_component_class_sink_consume_method_status status
=
289 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
290 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
291 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
292 struct fs_sink_stream
*stream
;
293 struct fs_sink_ctf_event_class
*ec
= NULL
;
294 const bt_clock_snapshot
*cs
= NULL
;
296 stream
= borrow_stream(fs_sink
, ir_stream
);
297 if (G_UNLIKELY(!stream
)) {
298 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
302 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
303 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
305 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
311 if (stream
->sc
->default_clock_class
) {
312 cs
= bt_message_event_borrow_default_clock_snapshot_const(
316 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
317 if (G_UNLIKELY(ret
)) {
318 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
327 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
328 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
331 bt_component_class_sink_consume_method_status status
=
332 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
333 const bt_packet
*ir_packet
=
334 bt_message_packet_beginning_borrow_packet_const(msg
);
335 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
336 struct fs_sink_stream
*stream
;
337 const bt_clock_snapshot
*cs
= NULL
;
339 stream
= borrow_stream(fs_sink
, ir_stream
);
340 if (G_UNLIKELY(!stream
)) {
341 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
345 if (stream
->sc
->packets_have_ts_begin
) {
346 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
352 * If we previously received a discarded events message with
353 * a time range, make sure that its beginning time matches what's
354 * expected for CTF 1.8, that is:
356 * * Its beginning time is the previous packet's end
357 * time (or the current packet's beginning time if
358 * this is the first packet).
360 * We check this here instead of in handle_packet_end_msg()
361 * because we want to catch any incompatible message as early as
362 * possible to report the error.
364 * Validation of the discarded events message's end time is
365 * performed in handle_packet_end_msg().
367 if (stream
->discarded_events_state
.in_range
) {
368 uint64_t expected_cs
;
371 * `stream->discarded_events_state.in_range` is only set
372 * when the stream class's discarded events have a time
375 * It is required that the packet beginning and end
376 * messages for this stream class have times when
377 * discarded events have a time range.
379 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
380 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
381 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
383 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
384 /* We're opening the first packet */
385 expected_cs
= bt_clock_snapshot_get_value(cs
);
387 expected_cs
= stream
->prev_packet_state
.end_cs
;
390 if (stream
->discarded_events_state
.beginning_cs
!=
392 BT_COMP_LOGE("Incompatible discarded events message: "
393 "unexpected beginning time: "
394 "beginning-cs-val=%" PRIu64
", "
395 "expected-beginning-cs-val=%" PRIu64
", "
396 "stream-id=%" PRIu64
", stream-name=\"%s\", "
397 "trace-name=\"%s\", path=\"%s/%s\"",
398 stream
->discarded_events_state
.beginning_cs
,
400 bt_stream_get_id(ir_stream
),
401 bt_stream_get_name(ir_stream
),
403 bt_stream_borrow_trace_const(ir_stream
)),
404 stream
->trace
->path
->str
, stream
->file_name
->str
);
405 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
411 * If we previously received a discarded packets message with a
412 * time range, make sure that its beginning and end times match
413 * what's expected for CTF 1.8, that is:
415 * * Its beginning time is the previous packet's end time.
417 * * Its end time is the current packet's beginning time.
419 if (stream
->discarded_packets_state
.in_range
) {
420 uint64_t expected_end_cs
;
423 * `stream->discarded_packets_state.in_range` is only
424 * set when the stream class's discarded packets have a
427 * It is required that the packet beginning and end
428 * messages for this stream class have times when
429 * discarded packets have a time range.
431 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
432 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
433 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
436 * It is not supported to have a discarded packets
437 * message _before_ the first packet: we cannot validate
438 * that its beginning time is compatible with CTF 1.8 in
441 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
442 BT_COMP_LOGE("Incompatible discarded packets message "
443 "occuring before the stream's first packet: "
444 "stream-id=%" PRIu64
", stream-name=\"%s\", "
445 "trace-name=\"%s\", path=\"%s/%s\"",
446 bt_stream_get_id(ir_stream
),
447 bt_stream_get_name(ir_stream
),
449 bt_stream_borrow_trace_const(ir_stream
)),
450 stream
->trace
->path
->str
, stream
->file_name
->str
);
451 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
455 if (stream
->discarded_packets_state
.beginning_cs
!=
456 stream
->prev_packet_state
.end_cs
) {
457 BT_COMP_LOGE("Incompatible discarded packets message: "
458 "unexpected beginning time: "
459 "beginning-cs-val=%" PRIu64
", "
460 "expected-beginning-cs-val=%" PRIu64
", "
461 "stream-id=%" PRIu64
", stream-name=\"%s\", "
462 "trace-name=\"%s\", path=\"%s/%s\"",
463 stream
->discarded_packets_state
.beginning_cs
,
464 stream
->prev_packet_state
.end_cs
,
465 bt_stream_get_id(ir_stream
),
466 bt_stream_get_name(ir_stream
),
468 bt_stream_borrow_trace_const(ir_stream
)),
469 stream
->trace
->path
->str
, stream
->file_name
->str
);
470 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
474 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
476 if (stream
->discarded_packets_state
.end_cs
!=
478 BT_COMP_LOGE("Incompatible discarded packets message: "
479 "unexpected end time: "
480 "end-cs-val=%" PRIu64
", "
481 "expected-end-cs-val=%" PRIu64
", "
482 "stream-id=%" PRIu64
", stream-name=\"%s\", "
483 "trace-name=\"%s\", path=\"%s/%s\"",
484 stream
->discarded_packets_state
.end_cs
,
486 bt_stream_get_id(ir_stream
),
487 bt_stream_get_name(ir_stream
),
489 bt_stream_borrow_trace_const(ir_stream
)),
490 stream
->trace
->path
->str
, stream
->file_name
->str
);
491 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
497 * We're not in a discarded packets time range anymore since we
498 * require that the discarded packets time ranges go from one
499 * packet's end time to the next packet's beginning time, and
500 * we're handling a packet beginning message here.
502 stream
->discarded_packets_state
.in_range
= false;
504 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
506 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
515 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
516 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
519 bt_component_class_sink_consume_method_status status
=
520 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
521 const bt_packet
*ir_packet
=
522 bt_message_packet_end_borrow_packet_const(msg
);
523 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
524 struct fs_sink_stream
*stream
;
525 const bt_clock_snapshot
*cs
= NULL
;
527 stream
= borrow_stream(fs_sink
, ir_stream
);
528 if (G_UNLIKELY(!stream
)) {
529 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
533 if (stream
->sc
->packets_have_ts_end
) {
534 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
540 * If we previously received a discarded events message with
541 * a time range, make sure that its end time matches what's
542 * expected for CTF 1.8, that is:
544 * * Its end time is the current packet's end time.
546 * Validation of the discarded events message's beginning time
547 * is performed in handle_packet_beginning_msg().
549 if (stream
->discarded_events_state
.in_range
) {
550 uint64_t expected_cs
;
553 * `stream->discarded_events_state.in_range` is only set
554 * when the stream class's discarded events have a time
557 * It is required that the packet beginning and end
558 * messages for this stream class have times when
559 * discarded events have a time range.
561 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
562 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
563 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
565 expected_cs
= bt_clock_snapshot_get_value(cs
);
567 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
568 BT_COMP_LOGE("Incompatible discarded events message: "
569 "unexpected end time: "
570 "end-cs-val=%" PRIu64
", "
571 "expected-end-cs-val=%" PRIu64
", "
572 "stream-id=%" PRIu64
", stream-name=\"%s\", "
573 "trace-name=\"%s\", path=\"%s/%s\"",
574 stream
->discarded_events_state
.end_cs
,
576 bt_stream_get_id(ir_stream
),
577 bt_stream_get_name(ir_stream
),
579 bt_stream_borrow_trace_const(ir_stream
)),
580 stream
->trace
->path
->str
, stream
->file_name
->str
);
581 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
586 ret
= fs_sink_stream_close_packet(stream
, cs
);
588 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
593 * We're not in a discarded events time range anymore since we
594 * require that the discarded events time ranges go from one
595 * packet's end time to the next packet's end time, and we're
596 * handling a packet end message here.
598 stream
->discarded_events_state
.in_range
= false;
605 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
606 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
608 bt_component_class_sink_consume_method_status status
=
609 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
610 const bt_stream
*ir_stream
=
611 bt_message_stream_beginning_borrow_stream_const(msg
);
612 const bt_stream_class
*ir_sc
=
613 bt_stream_borrow_class_const(ir_stream
);
614 struct fs_sink_stream
*stream
;
615 bool packets_have_beginning_end_cs
=
616 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
617 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
620 * Not supported: discarded events with default clock snapshots,
621 * but packet beginning/end without default clock snapshot.
623 if (!fs_sink
->ignore_discarded_events
&&
624 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
625 !packets_have_beginning_end_cs
) {
626 BT_COMP_LOGE("Unsupported stream: discarded events have "
627 "default clock snapshots, but packets have no "
628 "beginning and/or end default clock snapshots: "
630 "stream-id=%" PRIu64
", "
631 "stream-name=\"%s\"",
632 ir_stream
, bt_stream_get_id(ir_stream
),
633 bt_stream_get_name(ir_stream
));
634 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
639 * Not supported: discarded packets with default clock
640 * snapshots, but packet beginning/end without default clock
643 if (!fs_sink
->ignore_discarded_packets
&&
644 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
645 !packets_have_beginning_end_cs
) {
646 BT_COMP_LOGE("Unsupported stream: discarded packets have "
647 "default clock snapshots, but packets have no "
648 "beginning and/or end default clock snapshots: "
650 "stream-id=%" PRIu64
", "
651 "stream-name=\"%s\"",
652 ir_stream
, bt_stream_get_id(ir_stream
),
653 bt_stream_get_name(ir_stream
));
654 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
658 stream
= borrow_stream(fs_sink
, ir_stream
);
660 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
664 BT_COMP_LOGI("Created new, empty stream file: "
665 "stream-id=%" PRIu64
", stream-name=\"%s\", "
666 "trace-name=\"%s\", path=\"%s/%s\"",
667 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
668 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
669 stream
->trace
->path
->str
, stream
->file_name
->str
);
676 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
677 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
679 bt_component_class_sink_consume_method_status status
=
680 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
681 const bt_stream
*ir_stream
=
682 bt_message_stream_end_borrow_stream_const(msg
);
683 struct fs_sink_stream
*stream
;
685 stream
= borrow_stream(fs_sink
, ir_stream
);
687 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
691 BT_COMP_LOGI("Closing stream file: "
692 "stream-id=%" PRIu64
", stream-name=\"%s\", "
693 "trace-name=\"%s\", path=\"%s/%s\"",
694 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
695 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
696 stream
->trace
->path
->str
, stream
->file_name
->str
);
699 * This destroys the stream object and frees all its resources,
700 * closing the stream file.
702 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
709 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
710 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
712 bt_component_class_sink_consume_method_status status
=
713 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
714 const bt_stream
*ir_stream
=
715 bt_message_discarded_events_borrow_stream_const(msg
);
716 struct fs_sink_stream
*stream
;
717 const bt_clock_snapshot
*cs
= NULL
;
718 bt_property_availability avail
;
721 stream
= borrow_stream(fs_sink
, ir_stream
);
723 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
727 if (fs_sink
->ignore_discarded_events
) {
728 BT_COMP_LOGI("Ignoring discarded events message: "
729 "stream-id=%" PRIu64
", stream-name=\"%s\", "
730 "trace-name=\"%s\", path=\"%s/%s\"",
731 bt_stream_get_id(ir_stream
),
732 bt_stream_get_name(ir_stream
),
734 bt_stream_borrow_trace_const(ir_stream
)),
735 stream
->trace
->path
->str
, stream
->file_name
->str
);
739 if (stream
->discarded_events_state
.in_range
) {
740 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
741 "stream-id=%" PRIu64
", stream-name=\"%s\", "
742 "trace-name=\"%s\", path=\"%s/%s\"",
743 bt_stream_get_id(ir_stream
),
744 bt_stream_get_name(ir_stream
),
746 bt_stream_borrow_trace_const(ir_stream
)),
747 stream
->trace
->path
->str
, stream
->file_name
->str
);
748 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
753 * If we're currently in an opened packet (got a packet
754 * beginning message, but no packet end message yet), we do not
755 * support having a discarded events message with a time range
756 * because we require that the discarded events message's time
757 * range go from a packet's end time to the next packet's end
760 if (stream
->packet_state
.is_open
&&
761 stream
->sc
->discarded_events_has_ts
) {
762 BT_COMP_LOGE("Unsupported discarded events message with "
763 "default clock snapshots occuring within a packet: "
764 "stream-id=%" PRIu64
", stream-name=\"%s\", "
765 "trace-name=\"%s\", path=\"%s/%s\"",
766 bt_stream_get_id(ir_stream
),
767 bt_stream_get_name(ir_stream
),
769 bt_stream_borrow_trace_const(ir_stream
)),
770 stream
->trace
->path
->str
, stream
->file_name
->str
);
771 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
775 if (stream
->sc
->discarded_events_has_ts
) {
777 * Make the stream's state be in the time range of a
778 * discarded events message since we have the message's
779 * time range (`stream->sc->discarded_events_has_ts`).
781 stream
->discarded_events_state
.in_range
= true;
784 * The clock snapshot values will be validated when
785 * handling the next packet beginning and end messages
786 * (next calls to handle_packet_beginning_msg() and
787 * handle_packet_end_msg()).
789 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
792 stream
->discarded_events_state
.beginning_cs
=
793 bt_clock_snapshot_get_value(cs
);
794 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
797 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
800 avail
= bt_message_discarded_events_get_count(msg
, &count
);
801 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
803 * There's no specific count of discarded events: set it
804 * to 1 so that we know that we at least discarded
810 stream
->packet_state
.discarded_events_counter
+= count
;
817 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
818 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
820 bt_component_class_sink_consume_method_status status
=
821 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
822 const bt_stream
*ir_stream
=
823 bt_message_discarded_packets_borrow_stream_const(msg
);
824 struct fs_sink_stream
*stream
;
825 const bt_clock_snapshot
*cs
= NULL
;
826 bt_property_availability avail
;
829 stream
= borrow_stream(fs_sink
, ir_stream
);
831 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
835 if (fs_sink
->ignore_discarded_packets
) {
836 BT_COMP_LOGI("Ignoring discarded packets message: "
837 "stream-id=%" PRIu64
", stream-name=\"%s\", "
838 "trace-name=\"%s\", path=\"%s/%s\"",
839 bt_stream_get_id(ir_stream
),
840 bt_stream_get_name(ir_stream
),
842 bt_stream_borrow_trace_const(ir_stream
)),
843 stream
->trace
->path
->str
, stream
->file_name
->str
);
847 if (stream
->discarded_packets_state
.in_range
) {
848 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
849 "stream-id=%" PRIu64
", stream-name=\"%s\", "
850 "trace-name=\"%s\", path=\"%s/%s\"",
851 bt_stream_get_id(ir_stream
),
852 bt_stream_get_name(ir_stream
),
854 bt_stream_borrow_trace_const(ir_stream
)),
855 stream
->trace
->path
->str
, stream
->file_name
->str
);
856 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
861 * Discarded packets messages are guaranteed to occur between
864 BT_ASSERT(!stream
->packet_state
.is_open
);
866 if (stream
->sc
->discarded_packets_has_ts
) {
868 * Make the stream's state be in the time range of a
869 * discarded packets message since we have the message's
870 * time range (`stream->sc->discarded_packets_has_ts`).
872 stream
->discarded_packets_state
.in_range
= true;
875 * The clock snapshot values will be validated when
876 * handling the next packet beginning message (next call
877 * to handle_packet_beginning_msg()).
879 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
882 stream
->discarded_packets_state
.beginning_cs
=
883 bt_clock_snapshot_get_value(cs
);
884 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
887 stream
->discarded_packets_state
.end_cs
=
888 bt_clock_snapshot_get_value(cs
);
891 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
892 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
894 * There's no specific count of discarded packets: set
895 * it to 1 so that we know that we at least discarded
901 stream
->packet_state
.seq_num
+= count
;
908 void put_messages(bt_message_array_const msgs
, uint64_t count
)
912 for (i
= 0; i
< count
; i
++) {
913 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
918 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
919 bt_self_component_sink
*self_comp
)
921 bt_component_class_sink_consume_method_status status
=
922 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
923 struct fs_sink_comp
*fs_sink
;
924 bt_message_iterator_next_status next_status
;
925 uint64_t msg_count
= 0;
926 bt_message_array_const msgs
;
928 fs_sink
= bt_self_component_get_data(
929 bt_self_component_sink_as_self_component(self_comp
));
931 BT_ASSERT(fs_sink
->upstream_iter
);
933 /* Consume messages */
934 next_status
= bt_self_component_port_input_message_iterator_next(
935 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
936 if (next_status
< 0) {
937 status
= (int) next_status
;
941 switch (next_status
) {
942 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
946 for (i
= 0; i
< msg_count
; i
++) {
947 const bt_message
*msg
= msgs
[i
];
951 switch (bt_message_get_type(msg
)) {
952 case BT_MESSAGE_TYPE_EVENT
:
953 status
= handle_event_msg(fs_sink
, msg
);
955 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
956 status
= handle_packet_beginning_msg(
959 case BT_MESSAGE_TYPE_PACKET_END
:
960 status
= handle_packet_end_msg(
963 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
965 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
967 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
968 status
= handle_stream_beginning_msg(
971 case BT_MESSAGE_TYPE_STREAM_END
:
972 status
= handle_stream_end_msg(
975 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
976 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
977 /* Not supported by CTF 1.8 */
978 BT_COMP_LOGD_STR("Ignoring stream activity message.");
980 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
981 status
= handle_discarded_events_msg(
984 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
985 status
= handle_discarded_packets_msg(
992 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
994 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
995 BT_COMP_LOGE("Failed to handle message: "
996 "generated CTF traces could be incomplete: "
997 "output-dir-path=\"%s\"",
998 fs_sink
->output_dir_path
->str
);
1005 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1006 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1008 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1009 /* TODO: Finalize all traces (should already be done?) */
1010 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1012 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR
:
1013 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1015 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR
:
1016 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1025 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1026 put_messages(msgs
, msg_count
);
1033 bt_component_class_sink_graph_is_configured_method_status
ctf_fs_sink_graph_is_configured(
1034 bt_self_component_sink
*self_comp
)
1036 bt_component_class_sink_graph_is_configured_method_status status
=
1037 BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1038 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1039 bt_self_component_sink_as_self_component(self_comp
));
1041 fs_sink
->upstream_iter
=
1042 bt_self_component_port_input_message_iterator_create(
1043 bt_self_component_sink_borrow_input_port_by_name(
1044 self_comp
, in_port_name
));
1045 if (!fs_sink
->upstream_iter
) {
1046 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR
;
1055 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1057 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1058 bt_self_component_sink_as_self_component(self_comp
));
1060 destroy_fs_sink_comp(fs_sink
);