2 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #define BT_COMP_LOG_SELF_COMP (fs_sink->self_comp)
24 #define BT_LOG_OUTPUT_LEVEL (fs_sink->log_level)
25 #define BT_LOG_TAG "PLUGIN/SINK.CTF.FS"
26 #include "plugins/comp-logging.h"
28 #include <babeltrace2/babeltrace.h>
32 #include "common/assert.h"
33 #include "ctfser/ctfser.h"
36 #include "fs-sink-trace.h"
37 #include "fs-sink-stream.h"
38 #include "fs-sink-ctf-meta.h"
39 #include "translate-trace-ir-to-ctf-ir.h"
40 #include "translate-ctf-ir-to-tsdl.h"
43 const char * const in_port_name
= "in";
46 bt_component_class_init_method_status
ensure_output_dir_exists(
47 struct fs_sink_comp
*fs_sink
)
49 bt_component_class_init_method_status status
=
50 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
53 ret
= g_mkdir_with_parents(fs_sink
->output_dir_path
->str
, 0755);
56 "Cannot create directories for output directory",
57 ": output-dir-path=\"%s\"",
58 fs_sink
->output_dir_path
->str
);
59 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
68 bt_component_class_init_method_status
69 configure_component(struct fs_sink_comp
*fs_sink
,
70 const bt_value
*params
)
72 bt_component_class_init_method_status status
=
73 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
74 const bt_value
*value
;
76 value
= bt_value_map_borrow_entry_value_const(params
, "path");
78 BT_COMP_LOGE_STR("Missing mandatory `path` parameter.");
79 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
83 if (!bt_value_is_string(value
)) {
84 BT_COMP_LOGE_STR("`path` parameter: expecting a string.");
85 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
89 g_string_assign(fs_sink
->output_dir_path
,
90 bt_value_string_get(value
));
91 value
= bt_value_map_borrow_entry_value_const(params
,
92 "assume-single-trace");
94 if (!bt_value_is_bool(value
)) {
95 BT_COMP_LOGE_STR("`assume-single-trace` parameter: expecting a boolean.");
96 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
100 fs_sink
->assume_single_trace
= (bool) bt_value_bool_get(value
);
103 value
= bt_value_map_borrow_entry_value_const(params
,
104 "ignore-discarded-events");
106 if (!bt_value_is_bool(value
)) {
107 BT_COMP_LOGE_STR("`ignore-discarded-events` parameter: expecting a boolean.");
108 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
112 fs_sink
->ignore_discarded_events
=
113 (bool) bt_value_bool_get(value
);
116 value
= bt_value_map_borrow_entry_value_const(params
,
117 "ignore-discarded-packets");
119 if (!bt_value_is_bool(value
)) {
120 BT_COMP_LOGE_STR("`ignore-discarded-packets` parameter: expecting a boolean.");
121 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
125 fs_sink
->ignore_discarded_packets
=
126 (bool) bt_value_bool_get(value
);
129 value
= bt_value_map_borrow_entry_value_const(params
,
132 if (!bt_value_is_bool(value
)) {
133 BT_COMP_LOGE_STR("`quiet` parameter: expecting a boolean.");
134 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
138 fs_sink
->quiet
= (bool) bt_value_bool_get(value
);
146 void destroy_fs_sink_comp(struct fs_sink_comp
*fs_sink
)
152 if (fs_sink
->output_dir_path
) {
153 g_string_free(fs_sink
->output_dir_path
, TRUE
);
154 fs_sink
->output_dir_path
= NULL
;
157 if (fs_sink
->traces
) {
158 g_hash_table_destroy(fs_sink
->traces
);
159 fs_sink
->traces
= NULL
;
162 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(
163 fs_sink
->upstream_iter
);
171 bt_component_class_init_method_status
ctf_fs_sink_init(
172 bt_self_component_sink
*self_comp_sink
, const bt_value
*params
,
173 void *init_method_data
)
175 bt_component_class_init_method_status status
=
176 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
177 bt_self_component_add_port_status add_port_status
;
178 struct fs_sink_comp
*fs_sink
= NULL
;
179 bt_self_component
*self_comp
=
180 bt_self_component_sink_as_self_component(self_comp_sink
);
181 bt_logging_level log_level
= bt_component_get_logging_level(
182 bt_self_component_as_component(self_comp
));
184 fs_sink
= g_new0(struct fs_sink_comp
, 1);
186 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR
, log_level
, self_comp
,
187 "Failed to allocate one CTF FS sink structure.");
188 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
192 fs_sink
->log_level
= log_level
;
193 fs_sink
->self_comp
= self_comp
;
194 fs_sink
->output_dir_path
= g_string_new(NULL
);
195 status
= configure_component(fs_sink
, params
);
196 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
197 /* configure_component() logs errors */
201 if (fs_sink
->assume_single_trace
&&
202 g_file_test(fs_sink
->output_dir_path
->str
,
203 G_FILE_TEST_EXISTS
)) {
204 BT_COMP_LOGE("Single trace mode, but output path exists: "
205 "output-path=\"%s\"", fs_sink
->output_dir_path
->str
);
206 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
210 status
= ensure_output_dir_exists(fs_sink
);
211 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
212 /* ensure_output_dir_exists() logs errors */
216 fs_sink
->traces
= g_hash_table_new_full(g_direct_hash
, g_direct_equal
,
217 NULL
, (GDestroyNotify
) fs_sink_trace_destroy
);
218 if (!fs_sink
->traces
) {
219 BT_COMP_LOGE_STR("Failed to allocate one GHashTable.");
220 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
224 add_port_status
= bt_self_component_sink_add_input_port(
225 self_comp_sink
, in_port_name
, NULL
, NULL
);
226 switch (add_port_status
) {
227 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
228 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
230 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
231 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
237 bt_self_component_set_data(self_comp
, fs_sink
);
240 if (status
!= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
241 destroy_fs_sink_comp(fs_sink
);
248 struct fs_sink_stream
*borrow_stream(struct fs_sink_comp
*fs_sink
,
249 const bt_stream
*ir_stream
)
251 const bt_trace
*ir_trace
= bt_stream_borrow_trace_const(ir_stream
);
252 struct fs_sink_trace
*trace
;
253 struct fs_sink_stream
*stream
= NULL
;
255 trace
= g_hash_table_lookup(fs_sink
->traces
, ir_trace
);
256 if (G_UNLIKELY(!trace
)) {
257 if (fs_sink
->assume_single_trace
&&
258 g_hash_table_size(fs_sink
->traces
) > 0) {
259 BT_COMP_LOGE("Single trace mode, but getting more than one trace: "
260 "stream-name=\"%s\"",
261 bt_stream_get_name(ir_stream
));
265 trace
= fs_sink_trace_create(fs_sink
, ir_trace
);
271 stream
= g_hash_table_lookup(trace
->streams
, ir_stream
);
272 if (G_UNLIKELY(!stream
)) {
273 stream
= fs_sink_stream_create(trace
, ir_stream
);
284 bt_component_class_sink_consume_method_status
handle_event_msg(
285 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
288 bt_component_class_sink_consume_method_status status
=
289 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
290 const bt_event
*ir_event
= bt_message_event_borrow_event_const(msg
);
291 const bt_stream
*ir_stream
= bt_event_borrow_stream_const(ir_event
);
292 struct fs_sink_stream
*stream
;
293 struct fs_sink_ctf_event_class
*ec
= NULL
;
294 const bt_clock_snapshot
*cs
= NULL
;
296 stream
= borrow_stream(fs_sink
, ir_stream
);
297 if (G_UNLIKELY(!stream
)) {
298 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
302 ret
= try_translate_event_class_trace_ir_to_ctf_ir(fs_sink
,
303 stream
->sc
, bt_event_borrow_class_const(ir_event
), &ec
);
305 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
311 if (stream
->sc
->default_clock_class
) {
312 cs
= bt_message_event_borrow_default_clock_snapshot_const(
317 * If this event's stream does not support packets, then we
318 * lazily create artificial packets.
320 * The size of an artificial packet is arbitrarily at least
321 * 4 MiB (it usually is greater because we close it when
322 * comes the time to write a new event and the packet's content
323 * size is >= 4 MiB), except the last one which can be smaller.
325 if (G_UNLIKELY(!stream
->sc
->has_packets
)) {
326 if (stream
->packet_state
.is_open
&&
327 bt_ctfser_get_offset_in_current_packet_bits(&stream
->ctfser
) / 8 >=
330 * Stream's current packet is larger than 4 MiB:
331 * close it. A new packet will be opened just
334 ret
= fs_sink_stream_close_packet(stream
, NULL
);
336 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
341 if (!stream
->packet_state
.is_open
) {
342 /* Stream's packet is not currently opened: open it */
343 ret
= fs_sink_stream_open_packet(stream
, NULL
, NULL
);
345 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
351 BT_ASSERT(stream
->packet_state
.is_open
);
352 ret
= fs_sink_stream_write_event(stream
, cs
, ir_event
, ec
);
353 if (G_UNLIKELY(ret
)) {
354 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
363 bt_component_class_sink_consume_method_status
handle_packet_beginning_msg(
364 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
367 bt_component_class_sink_consume_method_status status
=
368 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
369 const bt_packet
*ir_packet
=
370 bt_message_packet_beginning_borrow_packet_const(msg
);
371 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
372 struct fs_sink_stream
*stream
;
373 const bt_clock_snapshot
*cs
= NULL
;
375 stream
= borrow_stream(fs_sink
, ir_stream
);
376 if (G_UNLIKELY(!stream
)) {
377 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
381 if (stream
->sc
->packets_have_ts_begin
) {
382 cs
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
388 * If we previously received a discarded events message with
389 * a time range, make sure that its beginning time matches what's
390 * expected for CTF 1.8, that is:
392 * * Its beginning time is the previous packet's end
393 * time (or the current packet's beginning time if
394 * this is the first packet).
396 * We check this here instead of in handle_packet_end_msg()
397 * because we want to catch any incompatible message as early as
398 * possible to report the error.
400 * Validation of the discarded events message's end time is
401 * performed in handle_packet_end_msg().
403 if (stream
->discarded_events_state
.in_range
) {
404 uint64_t expected_cs
;
407 * `stream->discarded_events_state.in_range` is only set
408 * when the stream class's discarded events have a time
411 * It is required that the packet beginning and end
412 * messages for this stream class have times when
413 * discarded events have a time range.
415 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
416 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
417 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
419 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
420 /* We're opening the first packet */
421 expected_cs
= bt_clock_snapshot_get_value(cs
);
423 expected_cs
= stream
->prev_packet_state
.end_cs
;
426 if (stream
->discarded_events_state
.beginning_cs
!=
428 BT_COMP_LOGE("Incompatible discarded events message: "
429 "unexpected beginning time: "
430 "beginning-cs-val=%" PRIu64
", "
431 "expected-beginning-cs-val=%" PRIu64
", "
432 "stream-id=%" PRIu64
", stream-name=\"%s\", "
433 "trace-name=\"%s\", path=\"%s/%s\"",
434 stream
->discarded_events_state
.beginning_cs
,
436 bt_stream_get_id(ir_stream
),
437 bt_stream_get_name(ir_stream
),
439 bt_stream_borrow_trace_const(ir_stream
)),
440 stream
->trace
->path
->str
, stream
->file_name
->str
);
441 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
447 * If we previously received a discarded packets message with a
448 * time range, make sure that its beginning and end times match
449 * what's expected for CTF 1.8, that is:
451 * * Its beginning time is the previous packet's end time.
453 * * Its end time is the current packet's beginning time.
455 if (stream
->discarded_packets_state
.in_range
) {
456 uint64_t expected_end_cs
;
459 * `stream->discarded_packets_state.in_range` is only
460 * set when the stream class's discarded packets have a
463 * It is required that the packet beginning and end
464 * messages for this stream class have times when
465 * discarded packets have a time range.
467 BT_ASSERT(stream
->sc
->discarded_packets_has_ts
);
468 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
469 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
472 * It is not supported to have a discarded packets
473 * message _before_ the first packet: we cannot validate
474 * that its beginning time is compatible with CTF 1.8 in
477 if (stream
->prev_packet_state
.end_cs
== UINT64_C(-1)) {
478 BT_COMP_LOGE("Incompatible discarded packets message "
479 "occuring before the stream's first packet: "
480 "stream-id=%" PRIu64
", stream-name=\"%s\", "
481 "trace-name=\"%s\", path=\"%s/%s\"",
482 bt_stream_get_id(ir_stream
),
483 bt_stream_get_name(ir_stream
),
485 bt_stream_borrow_trace_const(ir_stream
)),
486 stream
->trace
->path
->str
, stream
->file_name
->str
);
487 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
491 if (stream
->discarded_packets_state
.beginning_cs
!=
492 stream
->prev_packet_state
.end_cs
) {
493 BT_COMP_LOGE("Incompatible discarded packets message: "
494 "unexpected beginning time: "
495 "beginning-cs-val=%" PRIu64
", "
496 "expected-beginning-cs-val=%" PRIu64
", "
497 "stream-id=%" PRIu64
", stream-name=\"%s\", "
498 "trace-name=\"%s\", path=\"%s/%s\"",
499 stream
->discarded_packets_state
.beginning_cs
,
500 stream
->prev_packet_state
.end_cs
,
501 bt_stream_get_id(ir_stream
),
502 bt_stream_get_name(ir_stream
),
504 bt_stream_borrow_trace_const(ir_stream
)),
505 stream
->trace
->path
->str
, stream
->file_name
->str
);
506 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
510 expected_end_cs
= bt_clock_snapshot_get_value(cs
);
512 if (stream
->discarded_packets_state
.end_cs
!=
514 BT_COMP_LOGE("Incompatible discarded packets message: "
515 "unexpected end time: "
516 "end-cs-val=%" PRIu64
", "
517 "expected-end-cs-val=%" PRIu64
", "
518 "stream-id=%" PRIu64
", stream-name=\"%s\", "
519 "trace-name=\"%s\", path=\"%s/%s\"",
520 stream
->discarded_packets_state
.end_cs
,
522 bt_stream_get_id(ir_stream
),
523 bt_stream_get_name(ir_stream
),
525 bt_stream_borrow_trace_const(ir_stream
)),
526 stream
->trace
->path
->str
, stream
->file_name
->str
);
527 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
533 * We're not in a discarded packets time range anymore since we
534 * require that the discarded packets time ranges go from one
535 * packet's end time to the next packet's beginning time, and
536 * we're handling a packet beginning message here.
538 stream
->discarded_packets_state
.in_range
= false;
540 ret
= fs_sink_stream_open_packet(stream
, cs
, ir_packet
);
542 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
551 bt_component_class_sink_consume_method_status
handle_packet_end_msg(
552 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
555 bt_component_class_sink_consume_method_status status
=
556 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
557 const bt_packet
*ir_packet
=
558 bt_message_packet_end_borrow_packet_const(msg
);
559 const bt_stream
*ir_stream
= bt_packet_borrow_stream_const(ir_packet
);
560 struct fs_sink_stream
*stream
;
561 const bt_clock_snapshot
*cs
= NULL
;
563 stream
= borrow_stream(fs_sink
, ir_stream
);
564 if (G_UNLIKELY(!stream
)) {
565 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
569 if (stream
->sc
->packets_have_ts_end
) {
570 cs
= bt_message_packet_end_borrow_default_clock_snapshot_const(
576 * If we previously received a discarded events message with
577 * a time range, make sure that its end time matches what's
578 * expected for CTF 1.8, that is:
580 * * Its end time is the current packet's end time.
582 * Validation of the discarded events message's beginning time
583 * is performed in handle_packet_beginning_msg().
585 if (stream
->discarded_events_state
.in_range
) {
586 uint64_t expected_cs
;
589 * `stream->discarded_events_state.in_range` is only set
590 * when the stream class's discarded events have a time
593 * It is required that the packet beginning and end
594 * messages for this stream class have times when
595 * discarded events have a time range.
597 BT_ASSERT(stream
->sc
->discarded_events_has_ts
);
598 BT_ASSERT(stream
->sc
->packets_have_ts_begin
);
599 BT_ASSERT(stream
->sc
->packets_have_ts_end
);
601 expected_cs
= bt_clock_snapshot_get_value(cs
);
603 if (stream
->discarded_events_state
.end_cs
!= expected_cs
) {
604 BT_COMP_LOGE("Incompatible discarded events message: "
605 "unexpected end time: "
606 "end-cs-val=%" PRIu64
", "
607 "expected-end-cs-val=%" PRIu64
", "
608 "stream-id=%" PRIu64
", stream-name=\"%s\", "
609 "trace-name=\"%s\", path=\"%s/%s\"",
610 stream
->discarded_events_state
.end_cs
,
612 bt_stream_get_id(ir_stream
),
613 bt_stream_get_name(ir_stream
),
615 bt_stream_borrow_trace_const(ir_stream
)),
616 stream
->trace
->path
->str
, stream
->file_name
->str
);
617 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
622 ret
= fs_sink_stream_close_packet(stream
, cs
);
624 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
629 * We're not in a discarded events time range anymore since we
630 * require that the discarded events time ranges go from one
631 * packet's end time to the next packet's end time, and we're
632 * handling a packet end message here.
634 stream
->discarded_events_state
.in_range
= false;
641 bt_component_class_sink_consume_method_status
handle_stream_beginning_msg(
642 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
644 bt_component_class_sink_consume_method_status status
=
645 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
646 const bt_stream
*ir_stream
=
647 bt_message_stream_beginning_borrow_stream_const(msg
);
648 const bt_stream_class
*ir_sc
=
649 bt_stream_borrow_class_const(ir_stream
);
650 struct fs_sink_stream
*stream
;
651 bool packets_have_beginning_end_cs
=
652 bt_stream_class_packets_have_beginning_default_clock_snapshot(ir_sc
) &&
653 bt_stream_class_packets_have_end_default_clock_snapshot(ir_sc
);
656 * Not supported: discarded events or discarded packets support
657 * without packets support. Packets are the way to know where
658 * discarded events/packets occured in CTF 1.8.
660 if (!bt_stream_class_supports_packets(ir_sc
)) {
661 BT_ASSERT(!bt_stream_class_supports_discarded_packets(ir_sc
));
663 if (!fs_sink
->ignore_discarded_events
&&
664 bt_stream_class_supports_discarded_events(ir_sc
)) {
665 BT_COMP_LOGE("Unsupported stream: "
666 "stream does not support packets, "
667 "but supports discarded events: "
669 "stream-id=%" PRIu64
", "
670 "stream-name=\"%s\"",
671 ir_stream
, bt_stream_get_id(ir_stream
),
672 bt_stream_get_name(ir_stream
));
673 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
679 * Not supported: discarded events with default clock snapshots,
680 * but packet beginning/end without default clock snapshot.
682 if (!fs_sink
->ignore_discarded_events
&&
683 bt_stream_class_discarded_events_have_default_clock_snapshots(ir_sc
) &&
684 !packets_have_beginning_end_cs
) {
685 BT_COMP_LOGE("Unsupported stream: discarded events have "
686 "default clock snapshots, but packets have no "
687 "beginning and/or end default clock snapshots: "
689 "stream-id=%" PRIu64
", "
690 "stream-name=\"%s\"",
691 ir_stream
, bt_stream_get_id(ir_stream
),
692 bt_stream_get_name(ir_stream
));
693 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
698 * Not supported: discarded packets with default clock
699 * snapshots, but packet beginning/end without default clock
702 if (!fs_sink
->ignore_discarded_packets
&&
703 bt_stream_class_discarded_packets_have_default_clock_snapshots(ir_sc
) &&
704 !packets_have_beginning_end_cs
) {
705 BT_COMP_LOGE("Unsupported stream: discarded packets have "
706 "default clock snapshots, but packets have no "
707 "beginning and/or end default clock snapshots: "
709 "stream-id=%" PRIu64
", "
710 "stream-name=\"%s\"",
711 ir_stream
, bt_stream_get_id(ir_stream
),
712 bt_stream_get_name(ir_stream
));
713 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
717 stream
= borrow_stream(fs_sink
, ir_stream
);
719 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
723 BT_COMP_LOGI("Created new, empty stream file: "
724 "stream-id=%" PRIu64
", stream-name=\"%s\", "
725 "trace-name=\"%s\", path=\"%s/%s\"",
726 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
727 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
728 stream
->trace
->path
->str
, stream
->file_name
->str
);
735 bt_component_class_sink_consume_method_status
handle_stream_end_msg(
736 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
738 bt_component_class_sink_consume_method_status status
=
739 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
740 const bt_stream
*ir_stream
=
741 bt_message_stream_end_borrow_stream_const(msg
);
742 struct fs_sink_stream
*stream
;
744 stream
= borrow_stream(fs_sink
, ir_stream
);
746 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
750 if (G_UNLIKELY(!stream
->sc
->has_packets
&&
751 stream
->packet_state
.is_open
)) {
752 /* Close stream's current artificial packet */
753 int ret
= fs_sink_stream_close_packet(stream
, NULL
);
756 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
761 BT_COMP_LOGI("Closing stream file: "
762 "stream-id=%" PRIu64
", stream-name=\"%s\", "
763 "trace-name=\"%s\", path=\"%s/%s\"",
764 bt_stream_get_id(ir_stream
), bt_stream_get_name(ir_stream
),
765 bt_trace_get_name(bt_stream_borrow_trace_const(ir_stream
)),
766 stream
->trace
->path
->str
, stream
->file_name
->str
);
769 * This destroys the stream object and frees all its resources,
770 * closing the stream file.
772 g_hash_table_remove(stream
->trace
->streams
, ir_stream
);
779 bt_component_class_sink_consume_method_status
handle_discarded_events_msg(
780 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
782 bt_component_class_sink_consume_method_status status
=
783 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
784 const bt_stream
*ir_stream
=
785 bt_message_discarded_events_borrow_stream_const(msg
);
786 struct fs_sink_stream
*stream
;
787 const bt_clock_snapshot
*cs
= NULL
;
788 bt_property_availability avail
;
791 stream
= borrow_stream(fs_sink
, ir_stream
);
793 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
797 if (fs_sink
->ignore_discarded_events
) {
798 BT_COMP_LOGI("Ignoring discarded events message: "
799 "stream-id=%" PRIu64
", stream-name=\"%s\", "
800 "trace-name=\"%s\", path=\"%s/%s\"",
801 bt_stream_get_id(ir_stream
),
802 bt_stream_get_name(ir_stream
),
804 bt_stream_borrow_trace_const(ir_stream
)),
805 stream
->trace
->path
->str
, stream
->file_name
->str
);
809 if (stream
->discarded_events_state
.in_range
) {
810 BT_COMP_LOGE("Unsupported contiguous discarded events message: "
811 "stream-id=%" PRIu64
", stream-name=\"%s\", "
812 "trace-name=\"%s\", path=\"%s/%s\"",
813 bt_stream_get_id(ir_stream
),
814 bt_stream_get_name(ir_stream
),
816 bt_stream_borrow_trace_const(ir_stream
)),
817 stream
->trace
->path
->str
, stream
->file_name
->str
);
818 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
823 * If we're currently in an opened packet (got a packet
824 * beginning message, but no packet end message yet), we do not
825 * support having a discarded events message with a time range
826 * because we require that the discarded events message's time
827 * range go from a packet's end time to the next packet's end
830 if (stream
->packet_state
.is_open
&&
831 stream
->sc
->discarded_events_has_ts
) {
832 BT_COMP_LOGE("Unsupported discarded events message with "
833 "default clock snapshots occuring within a packet: "
834 "stream-id=%" PRIu64
", stream-name=\"%s\", "
835 "trace-name=\"%s\", path=\"%s/%s\"",
836 bt_stream_get_id(ir_stream
),
837 bt_stream_get_name(ir_stream
),
839 bt_stream_borrow_trace_const(ir_stream
)),
840 stream
->trace
->path
->str
, stream
->file_name
->str
);
841 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
845 if (stream
->sc
->discarded_events_has_ts
) {
847 * Make the stream's state be in the time range of a
848 * discarded events message since we have the message's
849 * time range (`stream->sc->discarded_events_has_ts`).
851 stream
->discarded_events_state
.in_range
= true;
854 * The clock snapshot values will be validated when
855 * handling the next packet beginning and end messages
856 * (next calls to handle_packet_beginning_msg() and
857 * handle_packet_end_msg()).
859 cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
862 stream
->discarded_events_state
.beginning_cs
=
863 bt_clock_snapshot_get_value(cs
);
864 cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
867 stream
->discarded_events_state
.end_cs
= bt_clock_snapshot_get_value(cs
);
870 avail
= bt_message_discarded_events_get_count(msg
, &count
);
871 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
873 * There's no specific count of discarded events: set it
874 * to 1 so that we know that we at least discarded
880 stream
->packet_state
.discarded_events_counter
+= count
;
887 bt_component_class_sink_consume_method_status
handle_discarded_packets_msg(
888 struct fs_sink_comp
*fs_sink
, const bt_message
*msg
)
890 bt_component_class_sink_consume_method_status status
=
891 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
892 const bt_stream
*ir_stream
=
893 bt_message_discarded_packets_borrow_stream_const(msg
);
894 struct fs_sink_stream
*stream
;
895 const bt_clock_snapshot
*cs
= NULL
;
896 bt_property_availability avail
;
899 stream
= borrow_stream(fs_sink
, ir_stream
);
901 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
905 if (fs_sink
->ignore_discarded_packets
) {
906 BT_COMP_LOGI("Ignoring discarded packets message: "
907 "stream-id=%" PRIu64
", stream-name=\"%s\", "
908 "trace-name=\"%s\", path=\"%s/%s\"",
909 bt_stream_get_id(ir_stream
),
910 bt_stream_get_name(ir_stream
),
912 bt_stream_borrow_trace_const(ir_stream
)),
913 stream
->trace
->path
->str
, stream
->file_name
->str
);
917 if (stream
->discarded_packets_state
.in_range
) {
918 BT_COMP_LOGE("Unsupported contiguous discarded packets message: "
919 "stream-id=%" PRIu64
", stream-name=\"%s\", "
920 "trace-name=\"%s\", path=\"%s/%s\"",
921 bt_stream_get_id(ir_stream
),
922 bt_stream_get_name(ir_stream
),
924 bt_stream_borrow_trace_const(ir_stream
)),
925 stream
->trace
->path
->str
, stream
->file_name
->str
);
926 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_ERROR
;
931 * Discarded packets messages are guaranteed to occur between
934 BT_ASSERT(!stream
->packet_state
.is_open
);
936 if (stream
->sc
->discarded_packets_has_ts
) {
938 * Make the stream's state be in the time range of a
939 * discarded packets message since we have the message's
940 * time range (`stream->sc->discarded_packets_has_ts`).
942 stream
->discarded_packets_state
.in_range
= true;
945 * The clock snapshot values will be validated when
946 * handling the next packet beginning message (next call
947 * to handle_packet_beginning_msg()).
949 cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
952 stream
->discarded_packets_state
.beginning_cs
=
953 bt_clock_snapshot_get_value(cs
);
954 cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
957 stream
->discarded_packets_state
.end_cs
=
958 bt_clock_snapshot_get_value(cs
);
961 avail
= bt_message_discarded_packets_get_count(msg
, &count
);
962 if (avail
!= BT_PROPERTY_AVAILABILITY_AVAILABLE
) {
964 * There's no specific count of discarded packets: set
965 * it to 1 so that we know that we at least discarded
971 stream
->packet_state
.seq_num
+= count
;
978 void put_messages(bt_message_array_const msgs
, uint64_t count
)
982 for (i
= 0; i
< count
; i
++) {
983 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
988 bt_component_class_sink_consume_method_status
ctf_fs_sink_consume(
989 bt_self_component_sink
*self_comp
)
991 bt_component_class_sink_consume_method_status status
=
992 BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
;
993 struct fs_sink_comp
*fs_sink
;
994 bt_message_iterator_next_status next_status
;
995 uint64_t msg_count
= 0;
996 bt_message_array_const msgs
;
998 fs_sink
= bt_self_component_get_data(
999 bt_self_component_sink_as_self_component(self_comp
));
1001 BT_ASSERT(fs_sink
->upstream_iter
);
1003 /* Consume messages */
1004 next_status
= bt_self_component_port_input_message_iterator_next(
1005 fs_sink
->upstream_iter
, &msgs
, &msg_count
);
1006 if (next_status
< 0) {
1007 status
= (int) next_status
;
1011 switch (next_status
) {
1012 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
:
1016 for (i
= 0; i
< msg_count
; i
++) {
1017 const bt_message
*msg
= msgs
[i
];
1021 switch (bt_message_get_type(msg
)) {
1022 case BT_MESSAGE_TYPE_EVENT
:
1023 status
= handle_event_msg(fs_sink
, msg
);
1025 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1026 status
= handle_packet_beginning_msg(
1029 case BT_MESSAGE_TYPE_PACKET_END
:
1030 status
= handle_packet_end_msg(
1033 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
1035 BT_COMP_LOGD_STR("Ignoring message iterator inactivity message.");
1037 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1038 status
= handle_stream_beginning_msg(
1041 case BT_MESSAGE_TYPE_STREAM_END
:
1042 status
= handle_stream_end_msg(
1045 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1046 status
= handle_discarded_events_msg(
1049 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1050 status
= handle_discarded_packets_msg(
1057 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
1059 if (status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
) {
1060 BT_COMP_LOGE("Failed to handle message: "
1061 "generated CTF traces could be incomplete: "
1062 "output-dir-path=\"%s\"",
1063 fs_sink
->output_dir_path
->str
);
1070 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN
:
1071 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_AGAIN
;
1073 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END
:
1074 /* TODO: Finalize all traces (should already be done?) */
1075 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_END
;
1077 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR
:
1078 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1080 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR
:
1081 status
= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_MEMORY_ERROR
;
1090 BT_ASSERT(status
!= BT_COMPONENT_CLASS_SINK_CONSUME_METHOD_STATUS_OK
);
1091 put_messages(msgs
, msg_count
);
1098 bt_component_class_sink_graph_is_configured_method_status
ctf_fs_sink_graph_is_configured(
1099 bt_self_component_sink
*self_comp
)
1101 bt_component_class_sink_graph_is_configured_method_status status
=
1102 BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_OK
;
1103 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1104 bt_self_component_sink_as_self_component(self_comp
));
1106 fs_sink
->upstream_iter
=
1107 bt_self_component_port_input_message_iterator_create_from_sink_component(
1109 bt_self_component_sink_borrow_input_port_by_name(
1110 self_comp
, in_port_name
));
1111 if (!fs_sink
->upstream_iter
) {
1112 status
= BT_COMPONENT_CLASS_SINK_GRAPH_IS_CONFIGURED_METHOD_STATUS_ERROR
;
1121 void ctf_fs_sink_finalize(bt_self_component_sink
*self_comp
)
1123 struct fs_sink_comp
*fs_sink
= bt_self_component_get_data(
1124 bt_self_component_sink_as_self_component(self_comp
));
1126 destroy_fs_sink_comp(fs_sink
);