BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED,
/*
- * Iterator is finalized, but not at the end yet. This means
- * that the "next" method can still return queued messages
- * before returning the BT_MESSAGE_ITERATOR_STATUS_CANCELED
- * status.
+ * Iterator is currently being finalized.
*/
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING,
/*
- * Iterator is finalized and ended: the "next" method always
- * returns BT_MESSAGE_ITERATOR_STATUS_CANCELED.
+ * Iterator is finalized.
*/
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED,
};
struct bt_message_iterator {
struct bt_graph *graph; /* Weak */
/*
- * This hash table keeps the state of a stream as viewed by
- * this message iterator. This is used to, in developer
- * mode:
- *
- * * Automatically enqueue "stream begin", "packet begin",
- * "packet end", and "stream end" messages depending
- * on the stream's state and on the next message returned
- * by the upstream component.
- *
- * * Make sure that, once the message iterator has seen a
- * "stream end" message for a given stream, no other
- * messages which refer to this stream can be delivered
- * by this iterator.
+ * This hash table keeps the state of a stream as viewed by this
+ * message iterator. This is used, in developer mode, to make
+ * sure that, once the message iterator has seen a "stream end"
+ * message for a given stream, no other messages which refer to
+ * this stream can be delivered by this iterator. It is also
+ * used to check for a valid sequence of messages.
*
- * The key (struct bt_stream *) is not owned by this. The
- * value is an allocated state structure.
+ * The key (struct bt_stream *) is not owned by this. The value
+ * is an allocated state structure.
*/
GHashTable *stream_states;
};
BT_HIDDEN
-void bt_self_component_port_input_message_iterator_finalize(
+void bt_self_component_port_input_message_iterator_try_finalize(
struct bt_self_component_port_input_message_iterator *iterator);
BT_HIDDEN
enum bt_message_iterator_status status)
{
switch (status) {
- case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
- return "BT_MESSAGE_ITERATOR_STATUS_CANCELED";
case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
return "BT_MESSAGE_ITERATOR_STATUS_AGAIN";
case BT_MESSAGE_ITERATOR_STATUS_END:
return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE";
case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED:
return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED";
+ case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING:
+ return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING";
case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED";
- case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED:
- return "BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED";
default:
return "(unknown)";
}
BT_MESSAGE_ITERATOR_STATUS_OK = 0,
BT_MESSAGE_ITERATOR_STATUS_END = 1,
BT_MESSAGE_ITERATOR_STATUS_AGAIN = 11,
- BT_MESSAGE_ITERATOR_STATUS_CANCELED = 125,
BT_MESSAGE_ITERATOR_STATUS_ERROR = -1,
BT_MESSAGE_ITERATOR_STATUS_NOMEM = -12,
} bt_message_iterator_status;
colander_data->msg_iter, &msgs,
colander_data->count_addr);
switch (msg_iter_status) {
- case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
- status = BT_SELF_COMPONENT_STATUS_OK;
- goto end;
case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
status = BT_SELF_COMPONENT_STATUS_AGAIN;
goto end;
BT_ASSERT(user_component);
BT_ASSERT(component_class);
BT_ASSERT(name);
-
type = bt_component_class_get_type(component_class);
BT_LIB_LOGD("Creating empty component from component class: %![cc-]+C, "
"comp-name=\"%s\"", component_class, name);
goto end;
}
- bt_object_init_shared_with_parent(&component->base,
- destroy_component);
+ bt_object_init_shared_with_parent(&component->base, destroy_component);
component->class = component_class;
bt_object_get_no_null_check(component->class);
component->destroy = component_destroy_funcs[type];
struct bt_port *cur_port = g_ptr_array_index(ports, i);
if (cur_port == port) {
- remove_port_by_index(component,
- ports, i);
+ remove_port_by_index(component, ports, i);
goto end;
}
}
BT_LIB_LOGD("Destroying connection: %!+x", connection);
/*
- * Make sure that each message iterator which was created
- * for this connection is finalized before we destroy it. Once a
- * message iterator is finalized, all its method return
- * NULL or the BT_MESSAGE_ITERATOR_STATUS_CANCELED status.
+ * Make sure that each message iterator which was created for
+ * this connection is finalized before we destroy it. Once a
+ * message iterator is finalized, all its method return NULL or
+ * the BT_MESSAGE_ITERATOR_STATUS_CANCELED status.
*
* Because connections are destroyed before components within a
* graph, this ensures that message iterators are always
bt_object_put_ref(upstream_port);
/*
- * Because this connection is ended, finalize (cancel) each
- * message iterator created from it.
+ * Because this connection is ended, finalize each message
+ * iterator created from it.
+ *
+ * In practice, this only happens when the connection is
+ * destroyed and not all its message iterators were finalized,
+ * which is on graph destruction.
*/
for (i = 0; i < conn->iterators->len; i++) {
struct bt_self_component_port_input_message_iterator *iterator =
BT_LIB_LOGD("Finalizing message iterator created by "
"this ended connection: %![iter-]+i", iterator);
- bt_self_component_port_input_message_iterator_finalize(
+ bt_self_component_port_input_message_iterator_try_finalize(
iterator);
/*
* in this situation:
*
* 1. We put and destroy a connection.
- * 2. This connection's destructor finalizes its active
- * message iterators.
- * 3. A message iterator's finalization function gets a
- * new reference on its component (reference count goes from
- * 0 to 1).
+ * 2. This connection's destructor finalizes its active message
+ * iterators.
+ * 3. A message iterator's finalization function gets a new
+ * reference on its component (reference count goes from 0 to
+ * 1).
* 4. Since this component's reference count goes to 1, it takes
* a reference on its parent (this graph). This graph's
* reference count goes from 0 to 1.
* 6. Since this component's reference count goes from 1 to 0,
* it puts its parent (this graph). This graph's reference
* count goes from 1 to 0.
- * 7. Since this graph's reference count goes from 1 to 0,
- * its destructor is called (this function).
+ * 7. Since this graph's reference count goes from 1 to 0, its
+ * destructor is called (this function).
*
* With the incrementation below, the graph's reference count at
* step 4 goes from 1 to 2, and from 2 to 1 at step 6. This
return stream_state;
}
+static inline
+void _set_self_comp_port_input_msg_iterator_state(
+ struct bt_self_component_port_input_message_iterator *iterator,
+ enum bt_self_component_port_input_message_iterator_state state)
+{
+ BT_ASSERT(iterator);
+ BT_LIB_LOGD("Updating message iterator's state: "
+ "new-state=%s",
+ bt_self_component_port_input_message_iterator_state_string(state));
+ iterator->state = state;
+}
+
+#ifdef BT_DEV_MODE
+# define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state
+#else
+# define set_self_comp_port_input_msg_iterator_state(_a, _b)
+#endif
+
static
void destroy_base_message_iterator(struct bt_object *obj)
{
iterator = (void *) obj;
BT_LIB_LOGD("Destroying self component input port message iterator object: "
"%!+i", iterator);
- bt_self_component_port_input_message_iterator_finalize(iterator);
+ bt_self_component_port_input_message_iterator_try_finalize(iterator);
if (iterator->stream_states) {
/*
}
BT_HIDDEN
-void bt_self_component_port_input_message_iterator_finalize(
+void bt_self_component_port_input_message_iterator_try_finalize(
struct bt_self_component_port_input_message_iterator *iterator)
{
typedef void (*method_t)(void *);
/* Skip user finalization if user initialization failed */
BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
"%!+i", iterator);
- return;
+ goto end;
case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
- case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED:
/* Already finalized */
BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
"%!+i", iterator);
- return;
+ goto end;
+ case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING:
+ /* Already finalized */
+ BT_LIB_LOGF("Message iterator is already being finalized: "
+ "%!+i", iterator);
+ abort();
default:
break;
}
BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator);
-
- if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED) {
- BT_LIB_LOGD("Updating message iterator's state: "
- "new-state=BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED");
- iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED;
- } else {
- BT_LIB_LOGD("Updating message iterator's state: "
- "new-state=BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED");
- iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED;
- }
-
+ set_self_comp_port_input_msg_iterator_state(iterator,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING);
BT_ASSERT(iterator->upstream_component);
comp_class = iterator->upstream_component->class;
iterator->upstream_component = NULL;
iterator->upstream_port = NULL;
+ set_self_comp_port_input_msg_iterator_state(iterator,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED);
BT_LIB_LOGD("Finalized message iterator: %!+i", iterator);
+
+end:
+ return;
}
BT_HIDDEN
iterator->upstream_port = upstream_port;
iterator->connection = iterator->upstream_port->connection;
iterator->graph = bt_component_borrow_graph(upstream_comp);
- iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED;
+ set_self_comp_port_input_msg_iterator_state(iterator,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED);
BT_LIB_LOGD("Created initial message iterator on self component input port: "
"%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
upstream_port, upstream_comp, iterator);
bt_message_iterator_status_string(iter_status));
if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
BT_LOGW_STR("Initialization method failed.");
+ BT_OBJECT_PUT_REF_AND_RESET(iterator);
goto end;
}
}
- iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE;
+ set_self_comp_port_input_msg_iterator_state(iterator,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
g_ptr_array_add(port->connection->iterators, iterator);
BT_LIB_LOGD("Created message iterator on self component input port: "
"%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
*/
BT_ASSERT(method);
BT_LOGD_STR("Calling user's \"next\" method.");
- status = method(iterator,
- (void *) iterator->base.msgs->pdata,
+ status = method(iterator, (void *) iterator->base.msgs->pdata,
MSG_BATCH_SIZE, user_count);
BT_LOGD("User method returned: status=%s",
bt_message_iterator_status_string(status));
goto end;
}
- if (iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED ||
- iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED_AND_ENDED) {
- /*
- * The user's "next" method, somehow, cancelled its own
- * message iterator. This can happen, for example,
- * when the user's method removes the port on which
- * there's the connection from which the iterator was
- * created. In this case, said connection is ended, and
- * all its message iterators are finalized.
- *
- * Only bt_object_put_ref() the returned message if
- * the status is BT_MESSAGE_ITERATOR_STATUS_OK
- * because otherwise this field could be garbage.
- */
- if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
- uint64_t i;
- bt_message_array_const msgs =
- (void *) iterator->base.msgs->pdata;
-
- for (i = 0; i < *user_count; i++) {
- bt_object_put_ref(msgs[i]);
- }
- }
-
- status = BT_MESSAGE_ITERATOR_STATUS_CANCELED;
- goto end;
- }
+#ifdef BT_DEV_MODE
+ /*
+ * There is no way that this iterator could have been finalized
+ * during its "next" method, as the only way to do this is to
+ * put the last iterator's reference, and this can only be done
+ * by its downstream owner.
+ */
+ BT_ASSERT(iterator->state ==
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
+#endif
switch (status) {
case BT_MESSAGE_ITERATOR_STATUS_OK:
BT_ASSERT_PRE(self_comp_port_input_msg_iter_can_end(iterator),
"Message iterator cannot end at this point: "
"%!+i", iterator);
- BT_ASSERT(iterator->state ==
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
- iterator->state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED;
- BT_LOGD("Set new status: status=%s",
- bt_message_iterator_status_string(status));
+ set_self_comp_port_input_msg_iterator_state(iterator,
+ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED);
goto end;
default:
/* Unknown non-error status */
return status;
}
-enum bt_message_iterator_status
-bt_port_output_message_iterator_next(
+enum bt_message_iterator_status bt_port_output_message_iterator_next(
struct bt_port_output_message_iterator *iterator,
bt_message_array_const *msgs_to_user,
uint64_t *count_to_user)
connection);
}
+static inline
+bool port_connection_iterators_are_finalized(struct bt_port *port)
+{
+ bool ret = true;
+ struct bt_connection *conn = port->connection;
+ uint64_t i;
+
+ if (!conn) {
+ goto end;
+ }
+
+ for (i = 0; i < conn->iterators->len; i++) {
+ struct bt_self_component_port_input_message_iterator *iterator =
+ conn->iterators->pdata[i];
+
+ BT_ASSERT(iterator);
+
+ if (iterator->state != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING &&
+ iterator->state != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED) {
+ BT_ASSERT_PRE_MSG("Message iterator is not being finalized or finalized: "
+ "%!+i", iterator);
+ ret = false;
+ goto end;
+ }
+ }
+
+end:
+ return ret;
+}
+
enum bt_self_component_port_status bt_self_component_port_remove_from_component(
struct bt_self_component_port *self_port)
{
struct bt_component *comp = NULL;
BT_ASSERT_PRE_NON_NULL(port, "Port");
-
+ BT_ASSERT_PRE(port_connection_iterators_are_finalized(port),
+ "At least one message iterator using this port has the wrong state.");
comp = (void *) bt_object_borrow_parent(&port->base);
if (!comp) {
BT_LIB_LOGV("Port already removed from its component: %!+p",
*/
break;
case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */
- case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
/*
* Message iterator reached the end: release it. It
* won't be considered again to find the youngest
status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
muxer_msg_iter, &muxer_upstream_msg_iter,
&next_return_ts);
- if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END ||
- status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) {
+ if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END) {
if (status < 0) {
BT_LOGE("Cannot find the youngest upstream message iterator wrapper: "
"status=%s",