X-Git-Url: http://drtracing.org/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fiterator.c;h=b5033083fdaeb25d44121cfbbd8852003fc86c33;hb=9b4f9b425f2efce9a6ccc25f7ae062ebc1116a7d;hp=2e2ab405667e684768928df02cffff6b3d3ec67a;hpb=870631a2db01676b476dbee615aade0a22926bcd;p=babeltrace.git diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index 2e2ab405..b5033083 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -49,9 +49,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -80,7 +77,6 @@ #include "message/message-iterator-inactivity.h" #include "message/stream.h" #include "message/packet.h" -#include "message/stream-activity.h" #include "lib/func-status.h" /* @@ -166,6 +162,16 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->auto_seek.msgs = NULL; } + if (iterator->upstream_msg_iters) { + /* + * At this point the message iterator is finalized, so + * it's detached from any upstream message iterator. + */ + BT_ASSERT(iterator->upstream_msg_iters->len == 0); + g_ptr_array_free(iterator->upstream_msg_iters, TRUE); + iterator->upstream_msg_iters = NULL; + } + destroy_base_message_iterator(obj); } @@ -173,6 +179,7 @@ BT_HIDDEN void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator) { + uint64_t i; typedef void (*method_t)(void *); struct bt_component_class *comp_class = NULL; @@ -235,6 +242,27 @@ void bt_self_component_port_input_message_iterator_try_finalize( method(iterator); } + /* Detach upstream message iterators */ + for (i = 0; i < iterator->upstream_msg_iters->len; i++) { + struct bt_self_component_port_input_message_iterator *upstream_msg_iter = + iterator->upstream_msg_iters->pdata[i]; + + upstream_msg_iter->downstream_msg_iter = NULL; + } + + g_ptr_array_set_size(iterator->upstream_msg_iters, 0); + + /* Detach downstream message iterator */ + if (iterator->downstream_msg_iter) { + gboolean existed; + + BT_ASSERT(iterator->downstream_msg_iter->upstream_msg_iters); + existed = g_ptr_array_remove_fast( + iterator->downstream_msg_iter->upstream_msg_iters, + iterator); + BT_ASSERT(existed); + } + iterator->upstream_component = NULL; iterator->upstream_port = NULL; set_self_comp_port_input_msg_iterator_state(iterator, @@ -295,22 +323,50 @@ bt_bool can_seek_beginning_true( static struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create_initial( - struct bt_component *upstream_comp, - struct bt_port *upstream_port) +create_self_component_input_port_message_iterator( + struct bt_self_message_iterator *self_downstream_msg_iter, + struct bt_self_component_port_input *self_port) { + typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( + void *, void *, void *); + int ret; - struct bt_self_component_port_input_message_iterator *iterator = NULL; + init_method_t init_method = NULL; + struct bt_self_component_port_input_message_iterator *iterator = + NULL; + struct bt_self_component_port_input_message_iterator *downstream_msg_iter = + (void *) self_downstream_msg_iter; + struct bt_port *port = (void *) self_port; + struct bt_port *upstream_port; + struct bt_component *comp; + struct bt_component *upstream_comp; + struct bt_component_class *upstream_comp_cls; - BT_ASSERT(upstream_comp); + BT_ASSERT_PRE_NON_NULL(port, "Input port"); + comp = bt_port_borrow_component_inline(port); + BT_ASSERT_PRE(bt_port_is_connected(port), + "Input port is not connected: %![port-]+p", port); + BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p", + port); + BT_ASSERT(port->connection); + upstream_port = port->connection->upstream_port; BT_ASSERT(upstream_port); - BT_ASSERT(bt_port_is_connected(upstream_port)); - BT_LIB_LOGI("Creating initial message iterator on self component input port: " - "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); - BT_ASSERT(bt_component_get_class_type(upstream_comp) == + upstream_comp = bt_port_borrow_component_inline(upstream_port); + BT_ASSERT(upstream_comp); + BT_ASSERT_PRE( + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED || + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_CONFIGURED, + "Graph is not configured: %!+g", + bt_component_borrow_graph(upstream_comp)); + upstream_comp_cls = upstream_comp->class; + BT_ASSERT(upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_SOURCE || - bt_component_get_class_type(upstream_comp) == + upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_FILTER); + BT_LIB_LOGI("Creating message iterator on self component input port: " + "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); iterator = g_new0( struct bt_self_component_port_input_message_iterator, 1); if (!iterator) { @@ -329,12 +385,16 @@ bt_self_component_port_input_message_iterator_create_initial( } iterator->last_ns_from_origin = INT64_MIN; - iterator->auto_seek.msgs = g_queue_new(); if (!iterator->auto_seek.msgs) { BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue."); - ret = -1; - goto end; + goto error; + } + + iterator->upstream_msg_iters = g_ptr_array_new(); + if (!iterator->upstream_msg_iters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); + goto error; } iterator->upstream_component = upstream_comp; @@ -407,66 +467,6 @@ bt_self_component_port_input_message_iterator_create_initial( can_seek_beginning_true; } - BT_LIB_LOGI("Created initial message iterator on self component input port: " - "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", - upstream_port, upstream_comp, iterator); - goto end; - -error: - BT_OBJECT_PUT_REF_AND_RESET(iterator); - -end: - return iterator; -} - -struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create( - struct bt_self_component_port_input *self_port) -{ - typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)( - void *, void *, void *); - - init_method_t init_method = NULL; - struct bt_self_component_port_input_message_iterator *iterator = - NULL; - struct bt_port *port = (void *) self_port; - struct bt_port *upstream_port; - struct bt_component *comp; - struct bt_component *upstream_comp; - struct bt_component_class *upstream_comp_cls; - - BT_ASSERT_PRE_NON_NULL(port, "Port"); - comp = bt_port_borrow_component_inline(port); - BT_ASSERT_PRE(bt_port_is_connected(port), - "Port is not connected: %![port-]+p", port); - BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p", - port); - BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp), - "Port's component's graph is canceled: " - "%![port-]+p, %![comp-]+c", port, comp); - BT_ASSERT(port->connection); - upstream_port = port->connection->upstream_port; - BT_ASSERT(upstream_port); - upstream_comp = bt_port_borrow_component_inline(upstream_port); - BT_ASSERT(upstream_comp); - BT_ASSERT_PRE( - bt_component_borrow_graph(upstream_comp)->config_state != - BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, - "Graph is not configured: %!+g", - bt_component_borrow_graph(upstream_comp)); - upstream_comp_cls = upstream_comp->class; - BT_ASSERT(upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_SOURCE || - upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_FILTER); - iterator = bt_self_component_port_input_message_iterator_create_initial( - upstream_comp, upstream_port); - if (!iterator) { - BT_LIB_LOGE_APPEND_CAUSE( - "Cannot create self component input port message iterator."); - goto error; - } - switch (upstream_comp_cls->type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: { @@ -509,6 +509,18 @@ bt_self_component_port_input_message_iterator_create( } } + if (downstream_msg_iter) { + /* Set this message iterator's downstream message iterator */ + iterator->downstream_msg_iter = downstream_msg_iter; + + /* + * Add this message iterator to the downstream message + * iterator's array of upstream message iterators. + */ + g_ptr_array_add(downstream_msg_iter->upstream_msg_iters, + iterator); + } + set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); g_ptr_array_add(port->connection->iterators, iterator); @@ -524,13 +536,33 @@ end: return iterator; } +struct bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_message_iterator( + struct bt_self_message_iterator *self_msg_iter, + struct bt_self_component_port_input *input_port) +{ + BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator"); + return create_self_component_input_port_message_iterator(self_msg_iter, + input_port); +} + +struct bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_sink_component( + struct bt_self_component_sink *self_comp, + struct bt_self_component_port_input *input_port) +{ + BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component"); + return create_self_component_input_port_message_iterator(NULL, + input_port); +} + void *bt_self_message_iterator_get_data( const struct bt_self_message_iterator *self_iterator) { struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return iterator->user_data; } @@ -540,7 +572,7 @@ void bt_self_message_iterator_set_data( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); iterator->user_data = data; BT_LIB_LOGD("Set message iterator's user data: " "%!+i, user-data-addr=%p", iterator, data); @@ -551,7 +583,7 @@ void bt_self_message_iterator_set_data( * time. */ -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_snapshots_are_monotonic_one( struct bt_self_component_port_input_message_iterator *iterator, @@ -589,21 +621,17 @@ bool clock_snapshots_are_monotonic_one( clock_snapshot = packet_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: { - struct bt_message_stream_activity *str_act_msg = - (struct bt_message_stream_activity *) msg; - - if (str_act_msg->default_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - clock_snapshot = str_act_msg->default_cs; + struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg; + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto end; } + + clock_snapshot = stream_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_END: - /* These messages don't have clock snapshots. */ - goto end; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: case BT_MESSAGE_TYPE_DISCARDED_PACKETS: { @@ -630,7 +658,7 @@ end: return result; } -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_snapshots_are_monotonic( struct bt_self_component_port_input_message_iterator *iterator, @@ -657,7 +685,7 @@ end: * stream is compatible with what we've seen before. */ -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_classes_are_compatible_one(struct bt_self_component_port_input_message_iterator *iterator, const struct bt_message *msg) @@ -689,7 +717,7 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_UNIX; } else if (clock_class_uuid) { iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_UUID; - memcpy(iterator->clock_expectation.uuid, clock_class_uuid, BABELTRACE_UUID_LEN); + bt_uuid_copy(iterator->clock_expectation.uuid, clock_class_uuid); } else { iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID; } @@ -697,7 +725,8 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_NONE: if (clock_class) { - BT_ASSERT_POST_MSG("Expecting no clock class, got one: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting no clock class, got one: %![cc-]+K", clock_class); result = false; goto end; @@ -707,13 +736,15 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_UNIX: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class with Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; @@ -722,27 +753,31 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_OTHER_UUID: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; } if (!clock_class_uuid) { - BT_ASSERT_POST_MSG("Expecting a clock class with UUID: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with UUID: %![cc-]+K", clock_class); result = false; goto end; } if (bt_uuid_compare(iterator->clock_expectation.uuid, clock_class_uuid)) { - BT_ASSERT_POST_MSG("Expecting a clock class with UUID, got one " + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with UUID, got one " "with a different UUID: %![cc-]+K, expected-uuid=%!u", clock_class, iterator->clock_expectation.uuid); result = false; @@ -752,20 +787,23 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; } if (clock_class_uuid) { - BT_ASSERT_POST_MSG("Expecting a clock class without UUID: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without UUID: %![cc-]+K", clock_class); result = false; goto end; @@ -780,7 +818,7 @@ end: return result; } -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_classes_are_compatible( struct bt_self_component_port_input_message_iterator *iterator, @@ -822,9 +860,9 @@ call_iterator_next_method( bt_common_func_status_string(status), *user_count); if (status == BT_FUNC_STATUS_OK) { - BT_ASSERT_POST(clock_classes_are_compatible(iterator, msgs, *user_count), + BT_ASSERT_POST_DEV(clock_classes_are_compatible(iterator, msgs, *user_count), "Clocks are not compatible"); - BT_ASSERT_POST(clock_snapshots_are_monotonic(iterator, msgs, *user_count), + BT_ASSERT_POST_DEV(clock_snapshots_are_monotonic(iterator, msgs, *user_count), "Clock snapshots are not monotonic"); } @@ -838,16 +876,16 @@ bt_self_component_port_input_message_iterator_next( { enum bt_message_iterator_next_status status = BT_FUNC_STATUS_OK; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)"); - BT_ASSERT_PRE_NON_NULL(user_count, "Message count (output)"); - BT_ASSERT_PRE(iterator->state == + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(msgs, "Message array (output)"); + BT_ASSERT_PRE_DEV_NON_NULL(user_count, "Message count (output)"); + BT_ASSERT_PRE_DEV(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE, "Message iterator's \"next\" called, but " "message iterator is in the wrong state: %!+i", iterator); BT_ASSERT(iterator->upstream_component); BT_ASSERT(iterator->upstream_component->class); - BT_ASSERT_PRE( + BT_ASSERT_PRE_DEV( bt_component_borrow_graph(iterator->upstream_component)->config_state != BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not configured: %!+g", @@ -888,7 +926,7 @@ bt_self_component_port_input_message_iterator_next( switch (status) { case BT_FUNC_STATUS_OK: - BT_ASSERT_POST(*user_count <= MSG_BATCH_SIZE, + BT_ASSERT_POST_DEV(*user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", *user_count, MSG_BATCH_SIZE); @@ -917,15 +955,14 @@ enum bt_message_iterator_next_status bt_port_output_message_iterator_next( enum bt_message_iterator_next_status status; int graph_status; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)"); - BT_ASSERT_PRE_NON_NULL(count_to_user, "Message count (output)"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(msgs_to_user, "Message array (output)"); + BT_ASSERT_PRE_DEV_NON_NULL(count_to_user, "Message count (output)"); BT_LIB_LOGD("Getting next output port message iterator's messages: " "%!+i", iterator); graph_status = bt_graph_consume_sink_no_check(iterator->graph, iterator->colander); switch (graph_status) { - case BT_FUNC_STATUS_CANCELED: case BT_FUNC_STATUS_AGAIN: case BT_FUNC_STATUS_END: case BT_FUNC_STATUS_MEMORY_ERROR: @@ -954,7 +991,7 @@ struct bt_component * bt_self_component_port_input_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator) { - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return iterator->upstream_component; } @@ -962,7 +999,7 @@ const struct bt_component * bt_self_component_port_input_message_iterator_borrow_component_const( const struct bt_self_component_port_input_message_iterator *iterator) { - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return iterator->upstream_component; } @@ -972,7 +1009,7 @@ struct bt_self_component *bt_self_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return (void *) iterator->upstream_component; } @@ -982,7 +1019,7 @@ struct bt_self_port_output *bt_self_message_iterator_borrow_port( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return (void *) iterator->upstream_port; } @@ -1297,20 +1334,18 @@ struct auto_seek_stream_state { * Value representing which step of this timeline we are at. * * time ---> - * [SB] 1 [SAB] 2 [PB] 3 [PE] 2 [SAE] 1 [SE] + * [SB] 1 [PB] 2 [PE] 1 [SE] * * At each point in the timeline, the messages we need to replicate are: * * 1: Stream beginning - * 2: Stream beginning, stream activity beginning - * 3: Stream beginning, stream activity beginning, packet beginning + * 2: Stream beginning, packet beginning * * Before "Stream beginning" and after "Stream end", we don't need to * replicate anything as the stream doesn't exist. */ enum { AUTO_SEEK_STREAM_STATE_STREAM_BEGAN, - AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN, AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, } state; @@ -1320,6 +1355,9 @@ struct auto_seek_stream_state { * alive by the time we use it. */ struct bt_packet *packet; + + /* Have we see a message with a clock snapshot yet? */ + bool seen_clock_snapshot; }; static @@ -1380,7 +1418,7 @@ int auto_seek_handle_message( (const void *) msg; clk_snapshot = event_msg->default_cs; - BT_ASSERT_POST(clk_snapshot, + BT_ASSERT_POST_DEV(clk_snapshot, "Event message has no default clock snapshot: %!+n", event_msg); break; @@ -1401,7 +1439,7 @@ int auto_seek_handle_message( (const void *) msg; clk_snapshot = packet_msg->default_cs; - BT_ASSERT_POST(clk_snapshot, + BT_ASSERT_POST_DEV(clk_snapshot, "Packet message has no default clock snapshot: %!+n", packet_msg); break; @@ -1412,7 +1450,7 @@ int auto_seek_handle_message( struct bt_message_discarded_items *msg_disc_items = (void *) msg; - BT_ASSERT_POST(msg_disc_items->default_begin_cs && + BT_ASSERT_POST_DEV(msg_disc_items->default_begin_cs && msg_disc_items->default_end_cs, "Discarded events/packets message has no default clock snapshots: %!+n", msg_disc_items); @@ -1447,7 +1485,7 @@ int auto_seek_handle_message( * as we don't know if items were really * discarded within the new time range. */ - uint64_t new_begin_raw_value; + uint64_t new_begin_raw_value = 0; ret = bt_clock_class_clock_value_from_ns_from_origin( msg_disc_items->default_end_cs->clock_class, @@ -1472,63 +1510,20 @@ int auto_seek_handle_message( goto skip_msg; } } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; - - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * -inf is always less than any requested time, - * and we can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ - goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); - } - - break; - } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; + struct bt_message_stream *stream_msg = + (struct bt_message_stream *) msg; - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - /* - * We can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + /* Ignore */ goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * +inf is always greater than any requested - * time. - */ - *got_first = true; - goto push_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); } + clk_snapshot = stream_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_END: - /* Ignore */ - goto skip_msg; default: abort(); } @@ -1563,39 +1558,47 @@ skip_msg: stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + if (stream_msg->default_cs_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + stream_state->seen_clock_snapshot = true; + } + BT_ASSERT(!bt_g_hash_table_contains(stream_states, stream_msg->stream)); g_hash_table_insert(stream_states, stream_msg->stream, stream_state); break; } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: + case BT_MESSAGE_TYPE_PACKET_BEGINNING: { - const struct bt_message_stream_activity *stream_act_msg = + const struct bt_message_packet *packet_msg = (const void *) msg; struct auto_seek_stream_state *stream_state; - /* Update stream's state: stream activity began. */ - stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); + /* Update stream's state: packet began. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); BT_ASSERT(stream_state); BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; + stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; BT_ASSERT(!stream_state->packet); + stream_state->packet = packet_msg->packet; + + if (packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } + break; } - case BT_MESSAGE_TYPE_PACKET_BEGINNING: + case BT_MESSAGE_TYPE_EVENT: { - const struct bt_message_packet *packet_msg = - (const void *) msg; + const struct bt_message_event *event_msg = (const void *) msg; struct auto_seek_stream_state *stream_state; - /* Update stream's state: packet began. */ - stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + stream_state = g_hash_table_lookup(stream_states, + event_msg->event->packet->stream); BT_ASSERT(stream_state); - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; - BT_ASSERT(!stream_state->packet); - stream_state->packet = packet_msg->packet; + // HELPME: are we sure that event messages have clock snapshots at this point? + stream_state->seen_clock_snapshot = true; + break; } case BT_MESSAGE_TYPE_PACKET_END: @@ -1609,24 +1612,14 @@ skip_msg: BT_ASSERT(stream_state); BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; BT_ASSERT(stream_state->packet); stream_state->packet = NULL; - break; - } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; - struct auto_seek_stream_state *stream_state; - /* Update stream's state: stream activity ended. */ - stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); - BT_ASSERT(stream_state); + if (packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; - BT_ASSERT(!stream_state->packet); break; } case BT_MESSAGE_TYPE_STREAM_END: @@ -1643,6 +1636,23 @@ skip_msg: g_hash_table_remove(stream_states, stream_msg->stream); break; } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + { + const struct bt_message_discarded_items *discarded_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, discarded_msg->stream); + BT_ASSERT(stream_state); + + if ((msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && discarded_msg->stream->class->discarded_events_have_default_clock_snapshots) || + (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && discarded_msg->stream->class->discarded_packets_have_default_clock_snapshots)) { + stream_state->seen_clock_snapshot = true; + } + + break; + } default: break; } @@ -1710,7 +1720,7 @@ int find_message_ge_ns_from_origin( switch (status) { case BT_FUNC_STATUS_OK: - BT_ASSERT_POST(user_count <= MSG_BATCH_SIZE, + BT_ASSERT_POST_DEV(user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", user_count, MSG_BATCH_SIZE); @@ -1934,44 +1944,57 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( bt_message *msg; const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const( bt_stream_borrow_class_const(stream)); - uint64_t raw_value; - - if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { - BT_LIB_LOGW_APPEND_CAUSE( - "Could not convert nanoseconds from origin to clock value: " - "ns-from-origin=%" PRId64 ", %![cc-]+K", - ns_from_origin, clock_class); - status = BT_FUNC_STATUS_ERROR; - goto end; + /* Initialize to silence maybe-uninitialized warning. */ + uint64_t raw_value = 0; + + /* + * If we haven't seen a message with a clock snapshot, we don't know if our seek time is within + * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value. + * + * Also, it would be a bit of a lie to generate a stream begin message with the seek time as its + * clock snapshot, because we don't really know if the stream existed at that time. If we have + * seen a message with a clock snapshot in our seeking, then we are sure that the + * seek time is not below the clock range, and we know the stream was active at that + * time (and that we cut it short). + */ + if (stream_state->seen_clock_snapshot) { + if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { + BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K", + ns_from_origin, clock_class); + status = BT_FUNC_STATUS_ERROR; + goto end; + } } switch (stream_state->state) { case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN: BT_ASSERT(stream_state->packet); BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet); - msg = bt_message_packet_beginning_create_with_default_clock_snapshot( - (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); - if (!msg) { - status = BT_FUNC_STATUS_MEMORY_ERROR; - goto end; + + if (stream->class->packets_have_beginning_default_clock_snapshot) { + /* + * If we are in the PACKET_BEGAN state, it means we have seen a "packet beginning" + * message. If "packet beginning" packets have clock snapshots, then we must have + * seen a clock snapshot. + */ + BT_ASSERT(stream_state->seen_clock_snapshot); + + msg = bt_message_packet_beginning_create_with_default_clock_snapshot( + (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); + } else { + msg = bt_message_packet_beginning_create((bt_self_message_iterator *) iterator, + stream_state->packet); } - g_queue_push_head(iterator->auto_seek.msgs, msg); - msg = NULL; - /* fall-thru */ - case AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN: - msg = bt_message_stream_activity_beginning_create( - (bt_self_message_iterator *) iterator, stream); if (!msg) { status = BT_FUNC_STATUS_MEMORY_ERROR; goto end; } - bt_message_stream_activity_beginning_set_default_clock_snapshot(msg, raw_value); - g_queue_push_head(iterator->auto_seek.msgs, msg); msg = NULL; /* fall-thru */ + case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN: msg = bt_message_stream_beginning_create( (bt_self_message_iterator *) iterator, stream); @@ -1980,6 +2003,10 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( goto end; } + if (stream_state->seen_clock_snapshot) { + bt_message_stream_beginning_set_default_clock_snapshot(msg, raw_value); + } + g_queue_push_head(iterator->auto_seek.msgs, msg); msg = NULL; break; @@ -2090,6 +2117,16 @@ bt_port_output_message_iterator_seek_beginning( iterator)); } +bt_bool bt_self_message_iterator_is_interrupted( + const struct bt_self_message_iterator *self_msg_iter) +{ + const struct bt_self_component_port_input_message_iterator *iterator = + (const void *) self_msg_iter; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return (bt_bool) bt_graph_is_interrupted(iterator->graph); +} + void bt_port_output_message_iterator_get_ref( const struct bt_port_output_message_iterator *iterator) {