2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #define BT_COMP_LOG_SELF_COMP (trimmer_comp->self_comp)
25 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
26 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
27 #include "plugins/comp-logging.h"
29 #include "compat/utc.h"
30 #include "compat/time.h"
31 #include <babeltrace2/babeltrace.h>
32 #include "common/common.h"
33 #include "common/assert.h"
40 #define NS_PER_S INT64_C(1000000000)
42 static const char * const in_port_name
= "in";
45 unsigned int hour
, minute
, second
, ns
;
48 struct trimmer_bound
{
50 * Nanoseconds from origin, valid if `is_set` is set and
51 * `is_infinite` is false.
53 int64_t ns_from_origin
;
55 /* True if this bound's full time (`ns_from_origin`) is set */
59 * True if this bound represents the infinity (negative or
60 * positive depending on which bound it is). If this is true,
61 * then we don't care about `ns_from_origin` above.
66 * This bound's time without the date; this time is used to set
67 * `ns_from_origin` once we know the date.
69 struct trimmer_time time
;
73 struct trimmer_bound begin
, end
;
75 bt_logging_level log_level
;
76 bt_self_component
*self_comp
;
79 enum trimmer_iterator_state
{
81 * Find the first message's date and set the bounds's times
84 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN
,
87 * Initially seek to the trimming range's beginning time.
89 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY
,
92 * Fill the output message queue for as long as received input
93 * messages are within the trimming time range.
95 TRIMMER_ITERATOR_STATE_TRIM
,
97 /* Flush the remaining messages in the output message queue */
98 TRIMMER_ITERATOR_STATE_ENDING
,
100 /* Trimming operation and message iterator is ended */
101 TRIMMER_ITERATOR_STATE_ENDED
,
104 struct trimmer_iterator
{
106 struct trimmer_comp
*trimmer_comp
;
109 bt_self_message_iterator
*self_msg_iter
;
111 enum trimmer_iterator_state state
;
114 bt_self_component_port_input_message_iterator
*upstream_iter
;
115 struct trimmer_bound begin
, end
;
118 * Queue of `const bt_message *` (owned by the queue).
120 * This is where the trimming operation pushes the messages to
121 * output by this message iterator.
123 GQueue
*output_messages
;
126 * Hash table of `bt_stream *` (weak) to
127 * `struct trimmer_iterator_stream_state *` (owned by the HT).
129 GHashTable
*stream_states
;
132 struct trimmer_iterator_stream_state
{
134 * True if the last pushed message for this stream was a stream
135 * activity end message.
137 bool last_msg_is_stream_activity_end
;
140 * Time to use for a generated stream end activity message when
143 int64_t stream_act_end_ns_from_origin
;
146 const bt_stream
*stream
;
148 /* Owned by this (`NULL` initially and between packets) */
149 const bt_packet
*cur_packet
;
153 void destroy_trimmer_comp(struct trimmer_comp
*trimmer_comp
)
155 BT_ASSERT(trimmer_comp
);
156 g_free(trimmer_comp
);
160 struct trimmer_comp
*create_trimmer_comp(void)
162 return g_new0(struct trimmer_comp
, 1);
166 void trimmer_finalize(bt_self_component_filter
*self_comp
)
168 struct trimmer_comp
*trimmer_comp
=
169 bt_self_component_get_data(
170 bt_self_component_filter_as_self_component(self_comp
));
173 destroy_trimmer_comp(trimmer_comp
);
178 * Compile regex in `pattern`, and try to match `string`. If there's a match,
179 * return true and set `*match_info` to the list of matches. The list of
180 * matches must be freed by the caller. If there's no match, return false and
181 * set `*match_info` to NULL;
184 bool compile_and_match(const char *pattern
, const char *string
, GMatchInfo
**match_info
) {
185 bool matches
= false;
186 GError
*regex_error
= NULL
;
189 regex
= g_regex_new(pattern
, 0, 0, ®ex_error
);
194 matches
= g_regex_match(regex
, string
, 0, match_info
);
197 * g_regex_match allocates `*match_info` even if it returns
198 * FALSE. If there's no match, we have no use for it, so free
199 * it immediatly and don't return it to the caller.
201 g_match_info_free(*match_info
);
205 g_regex_unref(regex
);
210 g_error_free(regex_error
);
217 * Convert the captured text in match number `match_num` in `match_info`
218 * to an unsigned integer.
221 guint64
match_to_uint(const GMatchInfo
*match_info
, gint match_num
) {
222 gchar
*text
, *endptr
;
225 text
= g_match_info_fetch(match_info
, match_num
);
229 * Because the input is carefully sanitized with regexes by the caller,
230 * we assume that g_ascii_strtoull cannot fail.
233 result
= g_ascii_strtoull(text
, &endptr
, 10);
234 BT_ASSERT(endptr
> text
);
235 BT_ASSERT(errno
== 0);
243 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
244 * This function is like match_to_uint, but multiplies the parsed number to get
245 * the expected result.
248 guint64
match_to_uint_ns(const GMatchInfo
*match_info
, gint match_num
) {
251 gint start_pos
, end_pos
, power
;
252 static int pow10
[] = {
253 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
256 nanoseconds
= match_to_uint(match_info
, match_num
);
258 /* Multiply by 10 as many times as there are omitted digits. */
259 ret
= g_match_info_fetch_pos(match_info
, match_num
, &start_pos
, &end_pos
);
262 power
= 9 - (end_pos
- start_pos
);
263 BT_ASSERT(power
>= 0 && power
<= 8);
265 nanoseconds
*= pow10
[power
];
271 * Sets the time (in ns from origin) of a trimmer bound from date and
274 * Returns a negative value if anything goes wrong.
277 int set_bound_ns_from_origin(struct trimmer_bound
*bound
,
278 unsigned int year
, unsigned int month
, unsigned int day
,
279 unsigned int hour
, unsigned int minute
, unsigned int second
,
280 unsigned int ns
, bool is_gmt
)
290 .tm_year
= year
- 1900,
295 result
= bt_timegm(&tm
);
297 result
= mktime(&tm
);
306 bound
->ns_from_origin
= (int64_t) result
;
307 bound
->ns_from_origin
*= NS_PER_S
;
308 bound
->ns_from_origin
+= ns
;
309 bound
->is_set
= true;
316 * Parses a timestamp, figuring out its format.
318 * Returns a negative value if anything goes wrong.
322 * YYYY-MM-DD hh:mm[:ss[.ns]]
326 * TODO: Check overflows.
329 int set_bound_from_str(struct trimmer_comp
*trimmer_comp
,
330 const char *str
, struct trimmer_bound
*bound
, bool is_gmt
)
332 /* Matches YYYY-MM-DD */
333 #define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
335 /* Matches HH:MM[:SS[.NS]] */
336 #define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
338 /* Matches [-]SS[.NS] */
339 #define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
341 GMatchInfo
*match_info
;
344 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
345 if (compile_and_match("^" DATE_RE
" " TIME_RE
"$", str
, &match_info
)) {
346 unsigned int year
= 0, month
= 0, day
= 0, hours
= 0, minutes
= 0, seconds
= 0, nanoseconds
= 0;
347 gint match_count
= g_match_info_get_match_count(match_info
);
349 BT_ASSERT(match_count
>= 6 && match_count
<= 8);
351 year
= match_to_uint(match_info
, 1);
352 month
= match_to_uint(match_info
, 2);
353 day
= match_to_uint(match_info
, 3);
354 hours
= match_to_uint(match_info
, 4);
355 minutes
= match_to_uint(match_info
, 5);
357 if (match_count
>= 7) {
358 seconds
= match_to_uint(match_info
, 6);
361 if (match_count
>= 8) {
362 nanoseconds
= match_to_uint_ns(match_info
, 7);
365 set_bound_ns_from_origin(bound
, year
, month
, day
, hours
, minutes
, seconds
, nanoseconds
, is_gmt
);
370 if (compile_and_match("^" DATE_RE
"$", str
, &match_info
)) {
371 unsigned int year
= 0, month
= 0, day
= 0;
373 BT_ASSERT(g_match_info_get_match_count(match_info
) == 4);
375 year
= match_to_uint(match_info
, 1);
376 month
= match_to_uint(match_info
, 2);
377 day
= match_to_uint(match_info
, 3);
379 set_bound_ns_from_origin(bound
, year
, month
, day
, 0, 0, 0, 0, is_gmt
);
384 /* Try `hh:mm[:ss[.ns]]` format */
385 if (compile_and_match("^" TIME_RE
"$", str
, &match_info
)) {
386 gint match_count
= g_match_info_get_match_count(match_info
);
387 BT_ASSERT(match_count
>= 3 && match_count
<= 5);
388 bound
->time
.hour
= match_to_uint(match_info
, 1);
389 bound
->time
.minute
= match_to_uint(match_info
, 2);
391 if (match_count
>= 4) {
392 bound
->time
.second
= match_to_uint(match_info
, 3);
395 if (match_count
>= 5) {
396 bound
->time
.ns
= match_to_uint_ns(match_info
, 4);
402 /* Try `[-]s[.ns]` format */
403 if (compile_and_match("^" S_NS_RE
"$", str
, &match_info
)) {
404 gboolean is_neg
, fetch_pos_ret
;
405 gint start_pos
, end_pos
, match_count
;
406 guint64 seconds
, nanoseconds
= 0;
408 match_count
= g_match_info_get_match_count(match_info
);
409 BT_ASSERT(match_count
>= 3 && match_count
<= 4);
411 /* Check for presence of negation sign. */
412 fetch_pos_ret
= g_match_info_fetch_pos(match_info
, 1, &start_pos
, &end_pos
);
413 BT_ASSERT(fetch_pos_ret
);
414 is_neg
= (end_pos
- start_pos
) > 0;
416 seconds
= match_to_uint(match_info
, 2);
418 if (match_count
>= 4) {
419 nanoseconds
= match_to_uint_ns(match_info
, 3);
422 bound
->ns_from_origin
= seconds
* NS_PER_S
+ nanoseconds
;
425 bound
->ns_from_origin
= -bound
->ns_from_origin
;
428 bound
->is_set
= true;
433 BT_COMP_LOGE("Invalid date/time format: param=\"%s\"", str
);
441 * Sets a trimmer bound's properties from a parameter string/integer
444 * Returns a negative value if anything goes wrong.
447 int set_bound_from_param(struct trimmer_comp
*trimmer_comp
,
448 const char *param_name
, const bt_value
*param
,
449 struct trimmer_bound
*bound
, bool is_gmt
)
455 if (bt_value_is_signed_integer(param
)) {
456 int64_t value
= bt_value_signed_integer_get(param
);
459 * Just convert it to a temporary string to handle
460 * everything the same way.
462 sprintf(tmp_arg
, "%" PRId64
, value
);
464 } else if (bt_value_is_string(param
)) {
465 arg
= bt_value_string_get(param
);
467 BT_COMP_LOGE("`%s` parameter must be an integer or a string value.",
473 ret
= set_bound_from_str(trimmer_comp
, arg
, bound
, is_gmt
);
480 int validate_trimmer_bounds(struct trimmer_comp
*trimmer_comp
,
481 struct trimmer_bound
*begin
, struct trimmer_bound
*end
)
485 BT_ASSERT(begin
->is_set
);
486 BT_ASSERT(end
->is_set
);
488 if (!begin
->is_infinite
&& !end
->is_infinite
&&
489 begin
->ns_from_origin
> end
->ns_from_origin
) {
490 BT_COMP_LOGE("Trimming time range's beginning time is greater than end time: "
491 "begin-ns-from-origin=%" PRId64
", "
492 "end-ns-from-origin=%" PRId64
,
493 begin
->ns_from_origin
,
494 end
->ns_from_origin
);
499 if (!begin
->is_infinite
&& begin
->ns_from_origin
== INT64_MIN
) {
500 BT_COMP_LOGE("Invalid trimming time range's beginning time: "
501 "ns-from-origin=%" PRId64
,
502 begin
->ns_from_origin
);
507 if (!end
->is_infinite
&& end
->ns_from_origin
== INT64_MIN
) {
508 BT_COMP_LOGE("Invalid trimming time range's end time: "
509 "ns-from-origin=%" PRId64
,
510 end
->ns_from_origin
);
520 int init_trimmer_comp_from_params(struct trimmer_comp
*trimmer_comp
,
521 const bt_value
*params
)
523 const bt_value
*value
;
527 value
= bt_value_map_borrow_entry_value_const(params
, "gmt");
529 trimmer_comp
->is_gmt
= (bool) bt_value_bool_get(value
);
532 value
= bt_value_map_borrow_entry_value_const(params
, "begin");
534 if (set_bound_from_param(trimmer_comp
, "begin", value
,
535 &trimmer_comp
->begin
, trimmer_comp
->is_gmt
)) {
536 /* set_bound_from_param() logs errors */
541 trimmer_comp
->begin
.is_infinite
= true;
542 trimmer_comp
->begin
.is_set
= true;
545 value
= bt_value_map_borrow_entry_value_const(params
, "end");
547 if (set_bound_from_param(trimmer_comp
, "end", value
,
548 &trimmer_comp
->end
, trimmer_comp
->is_gmt
)) {
549 /* set_bound_from_param() logs errors */
554 trimmer_comp
->end
.is_infinite
= true;
555 trimmer_comp
->end
.is_set
= true;
559 if (trimmer_comp
->begin
.is_set
&& trimmer_comp
->end
.is_set
) {
560 /* validate_trimmer_bounds() logs errors */
561 ret
= validate_trimmer_bounds(trimmer_comp
,
562 &trimmer_comp
->begin
, &trimmer_comp
->end
);
568 bt_component_class_init_method_status
trimmer_init(
569 bt_self_component_filter
*self_comp_flt
,
570 const bt_value
*params
, void *init_data
)
573 bt_component_class_init_method_status status
=
574 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
;
575 bt_self_component_add_port_status add_port_status
;
576 struct trimmer_comp
*trimmer_comp
= create_trimmer_comp();
577 bt_self_component
*self_comp
=
578 bt_self_component_filter_as_self_component(self_comp_flt
);
580 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
584 trimmer_comp
->log_level
= bt_component_get_logging_level(
585 bt_self_component_as_component(self_comp
));
586 trimmer_comp
->self_comp
= self_comp
;
587 add_port_status
= bt_self_component_filter_add_input_port(
588 self_comp_flt
, in_port_name
, NULL
, NULL
);
589 switch (add_port_status
) {
590 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
591 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
593 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
594 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
600 add_port_status
= bt_self_component_filter_add_output_port(
601 self_comp_flt
, "out", NULL
, NULL
);
602 switch (add_port_status
) {
603 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR
:
604 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
606 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR
:
607 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR
;
613 ret
= init_trimmer_comp_from_params(trimmer_comp
, params
);
615 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
619 bt_self_component_set_data(self_comp
, trimmer_comp
);
623 if (status
== BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK
) {
624 status
= BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR
;
628 destroy_trimmer_comp(trimmer_comp
);
636 void destroy_trimmer_iterator(struct trimmer_iterator
*trimmer_it
)
638 BT_ASSERT(trimmer_it
);
639 bt_self_component_port_input_message_iterator_put_ref(
640 trimmer_it
->upstream_iter
);
642 if (trimmer_it
->output_messages
) {
643 g_queue_free(trimmer_it
->output_messages
);
646 if (trimmer_it
->stream_states
) {
647 g_hash_table_destroy(trimmer_it
->stream_states
);
654 void destroy_trimmer_iterator_stream_state(
655 struct trimmer_iterator_stream_state
*sstate
)
658 BT_PACKET_PUT_REF_AND_RESET(sstate
->cur_packet
);
663 bt_component_class_message_iterator_init_method_status
trimmer_msg_iter_init(
664 bt_self_message_iterator
*self_msg_iter
,
665 bt_self_component_filter
*self_comp
,
666 bt_self_component_port_output
*port
)
668 bt_component_class_message_iterator_init_method_status status
=
669 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK
;
670 struct trimmer_iterator
*trimmer_it
;
672 trimmer_it
= g_new0(struct trimmer_iterator
, 1);
674 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR
;
678 trimmer_it
->trimmer_comp
= bt_self_component_get_data(
679 bt_self_component_filter_as_self_component(self_comp
));
680 BT_ASSERT(trimmer_it
->trimmer_comp
);
682 if (trimmer_it
->trimmer_comp
->begin
.is_set
&&
683 trimmer_it
->trimmer_comp
->end
.is_set
) {
685 * Both trimming time range's bounds are set, so skip
687 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
690 trimmer_it
->state
= TRIMMER_ITERATOR_STATE_SEEK_INITIALLY
;
693 trimmer_it
->begin
= trimmer_it
->trimmer_comp
->begin
;
694 trimmer_it
->end
= trimmer_it
->trimmer_comp
->end
;
695 trimmer_it
->upstream_iter
=
696 bt_self_component_port_input_message_iterator_create(
697 bt_self_component_filter_borrow_input_port_by_name(
698 self_comp
, in_port_name
));
699 if (!trimmer_it
->upstream_iter
) {
700 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR
;
704 trimmer_it
->output_messages
= g_queue_new();
705 if (!trimmer_it
->output_messages
) {
706 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR
;
710 trimmer_it
->stream_states
= g_hash_table_new_full(g_direct_hash
,
711 g_direct_equal
, NULL
,
712 (GDestroyNotify
) destroy_trimmer_iterator_stream_state
);
713 if (!trimmer_it
->stream_states
) {
714 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR
;
718 trimmer_it
->self_msg_iter
= self_msg_iter
;
719 bt_self_message_iterator_set_data(self_msg_iter
, trimmer_it
);
722 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK
&& trimmer_it
) {
723 destroy_trimmer_iterator(trimmer_it
);
730 int get_msg_ns_from_origin(const bt_message
*msg
, int64_t *ns_from_origin
,
733 const bt_clock_class
*clock_class
= NULL
;
734 const bt_clock_snapshot
*clock_snapshot
= NULL
;
735 bt_message_stream_activity_clock_snapshot_state sa_cs_state
;
739 BT_ASSERT(ns_from_origin
);
742 switch (bt_message_get_type(msg
)) {
743 case BT_MESSAGE_TYPE_EVENT
:
745 bt_message_event_borrow_stream_class_default_clock_class_const(
747 if (G_UNLIKELY(!clock_class
)) {
751 clock_snapshot
= bt_message_event_borrow_default_clock_snapshot_const(
754 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
756 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
758 if (G_UNLIKELY(!clock_class
)) {
762 clock_snapshot
= bt_message_packet_beginning_borrow_default_clock_snapshot_const(
765 case BT_MESSAGE_TYPE_PACKET_END
:
767 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
769 if (G_UNLIKELY(!clock_class
)) {
773 clock_snapshot
= bt_message_packet_end_borrow_default_clock_snapshot_const(
776 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
778 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
780 if (G_UNLIKELY(!clock_class
)) {
784 clock_snapshot
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
787 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
789 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
791 if (G_UNLIKELY(!clock_class
)) {
795 clock_snapshot
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
798 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
800 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
802 if (G_UNLIKELY(!clock_class
)) {
806 sa_cs_state
= bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
807 msg
, &clock_snapshot
);
808 if (sa_cs_state
== BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN
||
809 sa_cs_state
== BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE
) {
810 /* Lowest possible time to always include them */
811 *ns_from_origin
= INT64_MIN
;
812 goto no_clock_snapshot
;
816 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
818 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
820 if (G_UNLIKELY(!clock_class
)) {
824 sa_cs_state
= bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
825 msg
, &clock_snapshot
);
826 if (sa_cs_state
== BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN
) {
827 /* Lowest time to always include it */
828 *ns_from_origin
= INT64_MIN
;
829 goto no_clock_snapshot
;
830 } else if (sa_cs_state
== BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE
) {
831 /* Greatest time to always exclude it */
832 *ns_from_origin
= INT64_MAX
;
833 goto no_clock_snapshot
;
837 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
:
839 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
843 goto no_clock_snapshot
;
846 ret
= bt_clock_snapshot_get_ns_from_origin(clock_snapshot
,
848 if (G_UNLIKELY(ret
)) {
866 void put_messages(bt_message_array_const msgs
, uint64_t count
)
870 for (i
= 0; i
< count
; i
++) {
871 BT_MESSAGE_PUT_REF_AND_RESET(msgs
[i
]);
876 int set_trimmer_iterator_bound(struct trimmer_iterator
*trimmer_it
,
877 struct trimmer_bound
*bound
, int64_t ns_from_origin
,
880 struct trimmer_comp
*trimmer_comp
= trimmer_it
->trimmer_comp
;
882 time_t time_seconds
= (time_t) (ns_from_origin
/ NS_PER_S
);
885 BT_ASSERT(!bound
->is_set
);
888 /* We only need to extract the date from this time */
890 bt_gmtime_r(&time_seconds
, &tm
);
892 bt_localtime_r(&time_seconds
, &tm
);
896 BT_COMP_LOGE_ERRNO("Cannot convert timestamp to date and time",
897 "ts=%" PRId64
, (int64_t) time_seconds
);
902 ret
= set_bound_ns_from_origin(bound
, tm
.tm_year
+ 1900, tm
.tm_mon
+ 1,
903 tm
.tm_mday
, bound
->time
.hour
, bound
->time
.minute
,
904 bound
->time
.second
, bound
->time
.ns
, is_gmt
);
911 bt_component_class_message_iterator_next_method_status
912 state_set_trimmer_iterator_bounds(
913 struct trimmer_iterator
*trimmer_it
)
915 bt_message_iterator_next_status upstream_iter_status
=
916 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
;
917 struct trimmer_comp
*trimmer_comp
= trimmer_it
->trimmer_comp
;
918 bt_message_array_const msgs
;
920 int64_t ns_from_origin
= INT64_MIN
;
924 BT_ASSERT(!trimmer_it
->begin
.is_set
||
925 !trimmer_it
->end
.is_set
);
928 upstream_iter_status
=
929 bt_self_component_port_input_message_iterator_next(
930 trimmer_it
->upstream_iter
, &msgs
, &count
);
931 if (upstream_iter_status
!= BT_MESSAGE_ITERATOR_NEXT_STATUS_OK
) {
935 for (i
= 0; i
< count
; i
++) {
936 const bt_message
*msg
= msgs
[i
];
940 ret
= get_msg_ns_from_origin(msg
, &ns_from_origin
,
950 BT_ASSERT(ns_from_origin
!= INT64_MIN
&&
951 ns_from_origin
!= INT64_MAX
);
952 put_messages(msgs
, count
);
956 put_messages(msgs
, count
);
960 if (!trimmer_it
->begin
.is_set
) {
961 BT_ASSERT(!trimmer_it
->begin
.is_infinite
);
962 ret
= set_trimmer_iterator_bound(trimmer_it
, &trimmer_it
->begin
,
963 ns_from_origin
, trimmer_comp
->is_gmt
);
969 if (!trimmer_it
->end
.is_set
) {
970 BT_ASSERT(!trimmer_it
->end
.is_infinite
);
971 ret
= set_trimmer_iterator_bound(trimmer_it
, &trimmer_it
->end
,
972 ns_from_origin
, trimmer_comp
->is_gmt
);
978 ret
= validate_trimmer_bounds(trimmer_it
->trimmer_comp
,
979 &trimmer_it
->begin
, &trimmer_it
->end
);
987 put_messages(msgs
, count
);
988 upstream_iter_status
= BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR
;
991 return (int) upstream_iter_status
;
995 bt_component_class_message_iterator_next_method_status
state_seek_initially(
996 struct trimmer_iterator
*trimmer_it
)
998 struct trimmer_comp
*trimmer_comp
= trimmer_it
->trimmer_comp
;
999 bt_component_class_message_iterator_next_method_status status
=
1000 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1002 BT_ASSERT(trimmer_it
->begin
.is_set
);
1004 if (trimmer_it
->begin
.is_infinite
) {
1005 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
1006 trimmer_it
->upstream_iter
)) {
1007 BT_COMP_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
1008 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1012 status
= (int) bt_self_component_port_input_message_iterator_seek_beginning(
1013 trimmer_it
->upstream_iter
);
1015 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1016 trimmer_it
->upstream_iter
,
1017 trimmer_it
->begin
.ns_from_origin
)) {
1018 BT_COMP_LOGE("Cannot make upstream message iterator initially seek: "
1019 "seek-ns-from-origin=%" PRId64
,
1020 trimmer_it
->begin
.ns_from_origin
);
1021 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1025 status
= (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1026 trimmer_it
->upstream_iter
, trimmer_it
->begin
.ns_from_origin
);
1029 if (status
== BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1030 trimmer_it
->state
= TRIMMER_ITERATOR_STATE_TRIM
;
1038 void push_message(struct trimmer_iterator
*trimmer_it
, const bt_message
*msg
)
1040 g_queue_push_head(trimmer_it
->output_messages
, (void *) msg
);
1044 const bt_message
*pop_message(struct trimmer_iterator
*trimmer_it
)
1046 return g_queue_pop_tail(trimmer_it
->output_messages
);
1050 int clock_raw_value_from_ns_from_origin(const bt_clock_class
*clock_class
,
1051 int64_t ns_from_origin
, uint64_t *raw_value
)
1054 int64_t cc_offset_s
;
1055 uint64_t cc_offset_cycles
;
1058 bt_clock_class_get_offset(clock_class
, &cc_offset_s
, &cc_offset_cycles
);
1059 cc_freq
= bt_clock_class_get_frequency(clock_class
);
1060 return bt_common_clock_value_from_ns_from_origin(cc_offset_s
,
1061 cc_offset_cycles
, cc_freq
, ns_from_origin
, raw_value
);
1065 bt_component_class_message_iterator_next_method_status
1066 end_stream(struct trimmer_iterator
*trimmer_it
,
1067 struct trimmer_iterator_stream_state
*sstate
)
1069 bt_component_class_message_iterator_next_method_status status
=
1070 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1072 const bt_clock_class
*clock_class
;
1074 bt_message
*msg
= NULL
;
1076 BT_ASSERT(!trimmer_it
->end
.is_infinite
);
1078 if (!sstate
->stream
) {
1082 if (sstate
->cur_packet
) {
1084 * The last message could not have been a stream
1085 * activity end message if we have a current packet.
1087 BT_ASSERT(!sstate
->last_msg_is_stream_activity_end
);
1090 * Create and push a packet end message, making its time
1091 * the trimming range's end time.
1093 clock_class
= bt_stream_class_borrow_default_clock_class_const(
1094 bt_stream_borrow_class_const(sstate
->stream
));
1095 BT_ASSERT(clock_class
);
1096 ret
= clock_raw_value_from_ns_from_origin(clock_class
,
1097 trimmer_it
->end
.ns_from_origin
, &raw_value
);
1099 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1103 msg
= bt_message_packet_end_create_with_default_clock_snapshot(
1104 trimmer_it
->self_msg_iter
, sstate
->cur_packet
,
1107 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR
;
1111 push_message(trimmer_it
, msg
);
1113 BT_PACKET_PUT_REF_AND_RESET(sstate
->cur_packet
);
1116 * Because we generated a packet end message, set the
1117 * stream activity end message's time to use to the
1118 * trimming range's end time (this packet end message's
1121 sstate
->stream_act_end_ns_from_origin
=
1122 trimmer_it
->end
.ns_from_origin
;
1125 if (!sstate
->last_msg_is_stream_activity_end
) {
1126 /* Create and push a stream activity end message */
1127 msg
= bt_message_stream_activity_end_create(
1128 trimmer_it
->self_msg_iter
, sstate
->stream
);
1130 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR
;
1134 clock_class
= bt_stream_class_borrow_default_clock_class_const(
1135 bt_stream_borrow_class_const(sstate
->stream
));
1136 BT_ASSERT(clock_class
);
1138 if (sstate
->stream_act_end_ns_from_origin
== INT64_MIN
) {
1140 * We received at least what is necessary to
1141 * have a stream state (stream beginning and
1142 * stream activity beginning messages), but
1143 * nothing else: use the trimmer range's end
1146 sstate
->stream_act_end_ns_from_origin
=
1147 trimmer_it
->end
.ns_from_origin
;
1150 ret
= clock_raw_value_from_ns_from_origin(clock_class
,
1151 sstate
->stream_act_end_ns_from_origin
, &raw_value
);
1153 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1157 bt_message_stream_activity_end_set_default_clock_snapshot(
1159 push_message(trimmer_it
, msg
);
1163 /* Create and push a stream end message */
1164 msg
= bt_message_stream_end_create(trimmer_it
->self_msg_iter
,
1167 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR
;
1171 push_message(trimmer_it
, msg
);
1175 * Just to make sure that we don't use this stream state again
1176 * in the future without an obvious error.
1178 sstate
->stream
= NULL
;
1181 bt_message_put_ref(msg
);
1186 bt_component_class_message_iterator_next_method_status
end_iterator_streams(
1187 struct trimmer_iterator
*trimmer_it
)
1189 bt_component_class_message_iterator_next_method_status status
=
1190 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1191 GHashTableIter iter
;
1192 gpointer key
, sstate
;
1194 if (trimmer_it
->end
.is_infinite
) {
1196 * An infinite trimming range's end time guarantees that
1197 * we received (and pushed) all the appropriate end
1204 * End each stream and then remove them from the hash table of
1205 * stream states to release unneeded references.
1207 g_hash_table_iter_init(&iter
, trimmer_it
->stream_states
);
1209 while (g_hash_table_iter_next(&iter
, &key
, &sstate
)) {
1210 status
= end_stream(trimmer_it
, sstate
);
1217 g_hash_table_remove_all(trimmer_it
->stream_states
);
1224 * Handles a message which is associated to a given stream state. This
1225 * _could_ make the iterator's output message queue grow; this could
1226 * also consume the message without pushing anything to this queue, only
1227 * modifying the stream state.
1229 * This function consumes the `msg` reference, _whatever the outcome_.
1231 * `ns_from_origin` is the message's time, as given by
1232 * get_msg_ns_from_origin().
1234 * This function sets `reached_end` if handling this message made the
1235 * iterator reach the end of the trimming range. Note that the output
1236 * message queue could contain messages even if this function sets
1240 bt_component_class_message_iterator_next_method_status
1241 handle_message_with_stream_state(
1242 struct trimmer_iterator
*trimmer_it
, const bt_message
*msg
,
1243 struct trimmer_iterator_stream_state
*sstate
,
1244 int64_t ns_from_origin
, bool *reached_end
)
1246 bt_component_class_message_iterator_next_method_status status
=
1247 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1248 bt_message_type msg_type
= bt_message_get_type(msg
);
1252 case BT_MESSAGE_TYPE_EVENT
:
1253 if (G_UNLIKELY(!trimmer_it
->end
.is_infinite
&&
1254 ns_from_origin
> trimmer_it
->end
.ns_from_origin
)) {
1255 status
= end_iterator_streams(trimmer_it
);
1256 *reached_end
= true;
1260 BT_ASSERT(sstate
->cur_packet
);
1261 push_message(trimmer_it
, msg
);
1264 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1265 if (G_UNLIKELY(!trimmer_it
->end
.is_infinite
&&
1266 ns_from_origin
> trimmer_it
->end
.ns_from_origin
)) {
1267 status
= end_iterator_streams(trimmer_it
);
1268 *reached_end
= true;
1272 BT_ASSERT(!sstate
->cur_packet
);
1273 sstate
->cur_packet
=
1274 bt_message_packet_beginning_borrow_packet_const(msg
);
1275 bt_packet_get_ref(sstate
->cur_packet
);
1276 push_message(trimmer_it
, msg
);
1279 case BT_MESSAGE_TYPE_PACKET_END
:
1280 sstate
->stream_act_end_ns_from_origin
= ns_from_origin
;
1282 if (G_UNLIKELY(!trimmer_it
->end
.is_infinite
&&
1283 ns_from_origin
> trimmer_it
->end
.ns_from_origin
)) {
1284 status
= end_iterator_streams(trimmer_it
);
1285 *reached_end
= true;
1289 BT_ASSERT(sstate
->cur_packet
);
1290 BT_PACKET_PUT_REF_AND_RESET(sstate
->cur_packet
);
1291 push_message(trimmer_it
, msg
);
1294 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1295 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1298 * `ns_from_origin` is the message's time range's
1299 * beginning time here.
1301 int64_t end_ns_from_origin
;
1302 const bt_clock_snapshot
*end_cs
;
1304 if (bt_message_get_type(msg
) ==
1305 BT_MESSAGE_TYPE_DISCARDED_EVENTS
) {
1307 * Safe to ignore the return value because we
1308 * know there's a default clock and it's always
1311 end_cs
= bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1315 * Safe to ignore the return value because we
1316 * know there's a default clock and it's always
1319 end_cs
= bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1323 if (bt_clock_snapshot_get_ns_from_origin(end_cs
,
1324 &end_ns_from_origin
)) {
1325 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1329 sstate
->stream_act_end_ns_from_origin
= end_ns_from_origin
;
1331 if (!trimmer_it
->end
.is_infinite
&&
1332 ns_from_origin
> trimmer_it
->end
.ns_from_origin
) {
1333 status
= end_iterator_streams(trimmer_it
);
1334 *reached_end
= true;
1338 if (!trimmer_it
->end
.is_infinite
&&
1339 end_ns_from_origin
> trimmer_it
->end
.ns_from_origin
) {
1341 * This message's end time is outside the
1342 * trimming time range: replace it with a new
1343 * message having an end time equal to the
1344 * trimming time range's end and without a
1347 const bt_clock_class
*clock_class
=
1348 bt_clock_snapshot_borrow_clock_class_const(
1350 const bt_clock_snapshot
*begin_cs
;
1351 bt_message
*new_msg
;
1352 uint64_t end_raw_value
;
1354 ret
= clock_raw_value_from_ns_from_origin(clock_class
,
1355 trimmer_it
->end
.ns_from_origin
, &end_raw_value
);
1357 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1361 if (msg_type
== BT_MESSAGE_TYPE_DISCARDED_EVENTS
) {
1362 begin_cs
= bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1364 new_msg
= bt_message_discarded_events_create_with_default_clock_snapshots(
1365 trimmer_it
->self_msg_iter
,
1367 bt_clock_snapshot_get_value(begin_cs
),
1370 begin_cs
= bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1372 new_msg
= bt_message_discarded_packets_create_with_default_clock_snapshots(
1373 trimmer_it
->self_msg_iter
,
1375 bt_clock_snapshot_get_value(begin_cs
),
1380 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR
;
1384 /* Replace the original message */
1385 BT_MESSAGE_MOVE_REF(msg
, new_msg
);
1388 push_message(trimmer_it
, msg
);
1392 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
1393 if (!trimmer_it
->end
.is_infinite
&&
1394 ns_from_origin
> trimmer_it
->end
.ns_from_origin
) {
1396 * This only happens when the message's time is
1397 * known and is greater than the trimming
1398 * range's end time. Unknown and -inf times are
1400 * `trimmer_it->end.ns_from_origin`.
1402 status
= end_iterator_streams(trimmer_it
);
1403 *reached_end
= true;
1407 push_message(trimmer_it
, msg
);
1410 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
1411 if (trimmer_it
->end
.is_infinite
) {
1412 push_message(trimmer_it
, msg
);
1417 if (ns_from_origin
== INT64_MIN
) {
1418 /* Unknown: consider it to be in the trimmer window. */
1419 push_message(trimmer_it
, msg
);
1421 sstate
->last_msg_is_stream_activity_end
= true;
1422 } else if (ns_from_origin
== INT64_MAX
) {
1423 /* Infinite: use trimming range's end time */
1424 sstate
->stream_act_end_ns_from_origin
=
1425 trimmer_it
->end
.ns_from_origin
;
1427 /* Known: check if outside of trimming range */
1428 if (ns_from_origin
> trimmer_it
->end
.ns_from_origin
) {
1429 sstate
->stream_act_end_ns_from_origin
=
1430 trimmer_it
->end
.ns_from_origin
;
1431 status
= end_iterator_streams(trimmer_it
);
1432 *reached_end
= true;
1436 push_message(trimmer_it
, msg
);
1438 sstate
->last_msg_is_stream_activity_end
= true;
1439 sstate
->stream_act_end_ns_from_origin
= ns_from_origin
;
1443 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1444 push_message(trimmer_it
, msg
);
1447 case BT_MESSAGE_TYPE_STREAM_END
:
1449 * This is the end of a stream: end this
1450 * stream if its stream activity end message
1451 * time is not the trimming range's end time
1452 * (which means the final stream activity end
1453 * message had an infinite time). end_stream()
1454 * will generate its own stream end message.
1456 if (trimmer_it
->end
.is_infinite
) {
1457 push_message(trimmer_it
, msg
);
1460 /* We won't need this stream state again */
1461 g_hash_table_remove(trimmer_it
->stream_states
, sstate
->stream
);
1462 } else if (sstate
->stream_act_end_ns_from_origin
<
1463 trimmer_it
->end
.ns_from_origin
) {
1464 status
= end_stream(trimmer_it
, sstate
);
1465 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1469 /* We won't need this stream state again */
1470 g_hash_table_remove(trimmer_it
->stream_states
, sstate
->stream
);
1478 /* We release the message's reference whatever the outcome */
1479 bt_message_put_ref(msg
);
1480 return BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1484 * Handles an input message. This _could_ make the iterator's output
1485 * message queue grow; this could also consume the message without
1486 * pushing anything to this queue, only modifying the stream state.
1488 * This function consumes the `msg` reference, _whatever the outcome_.
1490 * This function sets `reached_end` if handling this message made the
1491 * iterator reach the end of the trimming range. Note that the output
1492 * message queue could contain messages even if this function sets
1496 bt_component_class_message_iterator_next_method_status
handle_message(
1497 struct trimmer_iterator
*trimmer_it
, const bt_message
*msg
,
1500 bt_component_class_message_iterator_next_method_status status
;
1501 const bt_stream
*stream
= NULL
;
1502 int64_t ns_from_origin
= INT64_MIN
;
1505 struct trimmer_iterator_stream_state
*sstate
= NULL
;
1506 struct trimmer_comp
*trimmer_comp
= trimmer_it
->trimmer_comp
;
1508 /* Find message's associated stream */
1509 switch (bt_message_get_type(msg
)) {
1510 case BT_MESSAGE_TYPE_EVENT
:
1511 stream
= bt_event_borrow_stream_const(
1512 bt_message_event_borrow_event_const(msg
));
1514 case BT_MESSAGE_TYPE_PACKET_BEGINNING
:
1515 stream
= bt_packet_borrow_stream_const(
1516 bt_message_packet_beginning_borrow_packet_const(msg
));
1518 case BT_MESSAGE_TYPE_PACKET_END
:
1519 stream
= bt_packet_borrow_stream_const(
1520 bt_message_packet_end_borrow_packet_const(msg
));
1522 case BT_MESSAGE_TYPE_DISCARDED_EVENTS
:
1523 stream
= bt_message_discarded_events_borrow_stream_const(msg
);
1525 case BT_MESSAGE_TYPE_DISCARDED_PACKETS
:
1526 stream
= bt_message_discarded_packets_borrow_stream_const(msg
);
1528 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING
:
1529 stream
= bt_message_stream_activity_beginning_borrow_stream_const(msg
);
1531 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END
:
1532 stream
= bt_message_stream_activity_end_borrow_stream_const(msg
);
1534 case BT_MESSAGE_TYPE_STREAM_BEGINNING
:
1535 stream
= bt_message_stream_beginning_borrow_stream_const(msg
);
1537 case BT_MESSAGE_TYPE_STREAM_END
:
1538 stream
= bt_message_stream_end_borrow_stream_const(msg
);
1544 if (G_LIKELY(stream
)) {
1545 /* Find stream state */
1546 sstate
= g_hash_table_lookup(trimmer_it
->stream_states
,
1548 if (G_UNLIKELY(!sstate
)) {
1549 /* No stream state yet: create one now */
1550 const bt_stream_class
*sc
;
1553 * Validate right now that the stream's class
1554 * has a registered default clock class so that
1555 * an existing stream state guarantees existing
1556 * default clock snapshots for its associated
1559 * Also check that clock snapshots are always
1562 sc
= bt_stream_borrow_class_const(stream
);
1563 if (!bt_stream_class_borrow_default_clock_class_const(sc
)) {
1564 BT_COMP_LOGE("Unsupported stream: stream class does "
1565 "not have a default clock class: "
1567 "stream-id=%" PRIu64
", "
1568 "stream-name=\"%s\"",
1569 stream
, bt_stream_get_id(stream
),
1570 bt_stream_get_name(stream
));
1571 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1576 * Temporary: make sure packet beginning, packet
1577 * end, discarded events, and discarded packets
1578 * messages have default clock snapshots until
1579 * the support for not having them is
1582 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1584 BT_COMP_LOGE("Unsupported stream: packets have "
1585 "no beginning clock snapshot: "
1587 "stream-id=%" PRIu64
", "
1588 "stream-name=\"%s\"",
1589 stream
, bt_stream_get_id(stream
),
1590 bt_stream_get_name(stream
));
1591 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1595 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1597 BT_COMP_LOGE("Unsupported stream: packets have "
1598 "no end clock snapshot: "
1600 "stream-id=%" PRIu64
", "
1601 "stream-name=\"%s\"",
1602 stream
, bt_stream_get_id(stream
),
1603 bt_stream_get_name(stream
));
1604 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1608 if (bt_stream_class_supports_discarded_events(sc
) &&
1609 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc
)) {
1610 BT_COMP_LOGE("Unsupported stream: discarded events "
1611 "have no clock snapshots: "
1613 "stream-id=%" PRIu64
", "
1614 "stream-name=\"%s\"",
1615 stream
, bt_stream_get_id(stream
),
1616 bt_stream_get_name(stream
));
1617 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1621 if (bt_stream_class_supports_discarded_packets(sc
) &&
1622 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc
)) {
1623 BT_COMP_LOGE("Unsupported stream: discarded packets "
1624 "have no clock snapshots: "
1626 "stream-id=%" PRIu64
", "
1627 "stream-name=\"%s\"",
1628 stream
, bt_stream_get_id(stream
),
1629 bt_stream_get_name(stream
));
1630 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1634 sstate
= g_new0(struct trimmer_iterator_stream_state
,
1637 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR
;
1641 sstate
->stream
= stream
;
1642 sstate
->stream_act_end_ns_from_origin
= INT64_MIN
;
1643 g_hash_table_insert(trimmer_it
->stream_states
,
1644 (void *) stream
, sstate
);
1648 /* Retrieve the message's time */
1649 ret
= get_msg_ns_from_origin(msg
, &ns_from_origin
, &skip
);
1650 if (G_UNLIKELY(ret
)) {
1651 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR
;
1655 if (G_LIKELY(sstate
)) {
1656 /* Message associated to a stream */
1657 status
= handle_message_with_stream_state(trimmer_it
, msg
,
1658 sstate
, ns_from_origin
, reached_end
);
1661 * handle_message_with_stream_state() unconditionally
1667 * Message not associated to a stream (message iterator
1670 if (G_UNLIKELY(ns_from_origin
> trimmer_it
->end
.ns_from_origin
)) {
1671 BT_MESSAGE_PUT_REF_AND_RESET(msg
);
1672 status
= end_iterator_streams(trimmer_it
);
1673 *reached_end
= true;
1675 push_message(trimmer_it
, msg
);
1676 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1682 /* We release the message's reference whatever the outcome */
1683 bt_message_put_ref(msg
);
1688 void fill_message_array_from_output_messages(
1689 struct trimmer_iterator
*trimmer_it
,
1690 bt_message_array_const msgs
, uint64_t capacity
, uint64_t *count
)
1695 * Move auto-seek messages to the output array (which is this
1696 * iterator's base message array).
1698 while (capacity
> 0 && !g_queue_is_empty(trimmer_it
->output_messages
)) {
1699 msgs
[*count
] = pop_message(trimmer_it
);
1704 BT_ASSERT(*count
> 0);
1708 bt_component_class_message_iterator_next_method_status
state_ending(
1709 struct trimmer_iterator
*trimmer_it
,
1710 bt_message_array_const msgs
, uint64_t capacity
,
1713 bt_component_class_message_iterator_next_method_status status
=
1714 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1716 if (g_queue_is_empty(trimmer_it
->output_messages
)) {
1717 trimmer_it
->state
= TRIMMER_ITERATOR_STATE_ENDED
;
1718 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END
;
1722 fill_message_array_from_output_messages(trimmer_it
, msgs
,
1730 bt_component_class_message_iterator_next_method_status
1731 state_trim(struct trimmer_iterator
*trimmer_it
,
1732 bt_message_array_const msgs
, uint64_t capacity
,
1735 bt_component_class_message_iterator_next_method_status status
=
1736 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1737 bt_message_array_const my_msgs
;
1740 bool reached_end
= false;
1742 while (g_queue_is_empty(trimmer_it
->output_messages
)) {
1743 status
= (int) bt_self_component_port_input_message_iterator_next(
1744 trimmer_it
->upstream_iter
, &my_msgs
, &my_count
);
1745 if (G_UNLIKELY(status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
)) {
1746 if (status
== BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END
) {
1747 status
= end_iterator_streams(trimmer_it
);
1748 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1753 TRIMMER_ITERATOR_STATE_ENDING
;
1754 status
= state_ending(trimmer_it
, msgs
,
1761 BT_ASSERT(my_count
> 0);
1763 for (i
= 0; i
< my_count
; i
++) {
1764 status
= handle_message(trimmer_it
, my_msgs
[i
],
1768 * handle_message() unconditionally consumes the
1769 * message reference.
1773 if (G_UNLIKELY(status
!=
1774 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
)) {
1775 put_messages(my_msgs
, my_count
);
1779 if (G_UNLIKELY(reached_end
)) {
1781 * This message's time was passed the
1782 * trimming time range's end time: we
1783 * are done. Their might still be
1784 * messages in the output message queue,
1785 * so move to the "ending" state and
1786 * apply it immediately since
1787 * state_trim() is called within the
1790 put_messages(my_msgs
, my_count
);
1792 TRIMMER_ITERATOR_STATE_ENDING
;
1793 status
= state_ending(trimmer_it
, msgs
,
1801 * There's at least one message in the output message queue:
1802 * move the messages to the output message array.
1804 BT_ASSERT(!g_queue_is_empty(trimmer_it
->output_messages
));
1805 fill_message_array_from_output_messages(trimmer_it
, msgs
,
1813 bt_component_class_message_iterator_next_method_status
trimmer_msg_iter_next(
1814 bt_self_message_iterator
*self_msg_iter
,
1815 bt_message_array_const msgs
, uint64_t capacity
,
1818 struct trimmer_iterator
*trimmer_it
=
1819 bt_self_message_iterator_get_data(self_msg_iter
);
1820 bt_component_class_message_iterator_next_method_status status
=
1821 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
;
1823 BT_ASSERT(trimmer_it
);
1825 if (G_LIKELY(trimmer_it
->state
== TRIMMER_ITERATOR_STATE_TRIM
)) {
1826 status
= state_trim(trimmer_it
, msgs
, capacity
, count
);
1827 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1831 switch (trimmer_it
->state
) {
1832 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN
:
1833 status
= state_set_trimmer_iterator_bounds(trimmer_it
);
1834 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1838 status
= state_seek_initially(trimmer_it
);
1839 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1843 status
= state_trim(trimmer_it
, msgs
, capacity
, count
);
1844 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1849 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY
:
1850 status
= state_seek_initially(trimmer_it
);
1851 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1855 status
= state_trim(trimmer_it
, msgs
, capacity
, count
);
1856 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1861 case TRIMMER_ITERATOR_STATE_ENDING
:
1862 status
= state_ending(trimmer_it
, msgs
, capacity
,
1864 if (status
!= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK
) {
1869 case TRIMMER_ITERATOR_STATE_ENDED
:
1870 status
= BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END
;
1882 void trimmer_msg_iter_finalize(bt_self_message_iterator
*self_msg_iter
)
1884 struct trimmer_iterator
*trimmer_it
=
1885 bt_self_message_iterator_get_data(self_msg_iter
);
1887 BT_ASSERT(trimmer_it
);
1888 destroy_trimmer_iterator(trimmer_it
);