* another data structure is faster than this for our typical
* use cases.
*/
- GPtrArray *muxer_upstream_msg_iters;
+ GPtrArray *active_muxer_upstream_msg_iters;
+
+ /*
+ * Array of struct muxer_upstream_msg_iter * (owned by this).
+ *
+ * We move ended message iterators from
+ * `active_muxer_upstream_msg_iters` to this array so as to be
+ * able to restore them when seeking.
+ */
+ GPtrArray *ended_muxer_upstream_msg_iters;
/* Last time returned in a message */
int64_t last_returned_ts_ns;
unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
};
+static
+void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
+{
+ const bt_message *msg;
+
+ while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
+ bt_message_put_ref(msg);
+ }
+}
+
static
void destroy_muxer_upstream_msg_iter(
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter,
muxer_upstream_msg_iter->msgs->length);
- bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter);
+ bt_self_component_port_input_message_iterator_put_ref(
+ muxer_upstream_msg_iter->msg_iter);
if (muxer_upstream_msg_iter->msgs) {
- const bt_message *msg;
-
- while ((msg = g_queue_pop_head(
- muxer_upstream_msg_iter->msgs))) {
- bt_message_put_ref(msg);
- }
-
+ empty_message_queue(muxer_upstream_msg_iter);
g_queue_free(muxer_upstream_msg_iter->msgs);
}
goto end;
}
- g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters,
+ g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
muxer_upstream_msg_iter);
BT_LOGD("Added muxer's upstream message iterator wrapper: "
"addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
static
bt_self_message_iterator_status muxer_upstream_msg_iter_next(
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+ struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+ bool *is_ended)
{
bt_self_message_iterator_status status;
bt_message_iterator_status input_port_iter_status;
* won't be considered again to find the youngest
* message.
*/
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
+ *is_ended = true;
status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
break;
default:
BT_ASSERT(muxer_upstream_msg_iter);
*muxer_upstream_msg_iter = NULL;
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
const bt_message *msg;
struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
- g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i);
+ g_ptr_array_index(
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
+ i);
int64_t msg_ts_ns;
if (!cur_muxer_upstream_msg_iter->msg_iter) {
static
bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+ struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+ bool *is_ended)
{
bt_self_message_iterator_status status =
BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
}
/* muxer_upstream_msg_iter_next() logs details/errors */
- status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter);
+ status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
+ is_ended);
end:
return status;
bt_self_message_iterator_status status =
BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
size_t i;
+ bool is_ended = false;
BT_LOGV("Validating muxer's upstream message iterator wrappers: "
"muxer-msg-iter-addr=%p", muxer_msg_iter);
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
g_ptr_array_index(
- muxer_msg_iter->muxer_upstream_msg_iters,
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
i);
status = validate_muxer_upstream_msg_iter(
- muxer_upstream_msg_iter);
+ muxer_upstream_msg_iter, &is_ended);
if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
if (status < 0) {
BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
}
/*
- * Remove this muxer upstream message iterator
- * if it's ended or canceled.
+ * Move this muxer upstream message iterator to the
+ * array of ended iterators if it's ended.
*/
- if (!muxer_upstream_msg_iter->msg_iter) {
+ if (unlikely(is_ended)) {
+ BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: "
+ "muxer-msg-iter-addr=%p, "
+ "muxer-upstream-msg-iter-wrap-addr=%p",
+ muxer_msg_iter, muxer_upstream_msg_iter);
+ g_ptr_array_add(
+ muxer_msg_iter->ended_muxer_upstream_msg_iters,
+ muxer_upstream_msg_iter);
+ muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
+
/*
* Use g_ptr_array_remove_fast() because the
* order of those elements is not important.
*/
- BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: "
- "muxer-msg-iter-addr=%p, "
- "muxer-upstream-msg-iter-wrap-addr=%p",
- muxer_msg_iter, muxer_upstream_msg_iter);
g_ptr_array_remove_index_fast(
- muxer_msg_iter->muxer_upstream_msg_iters,
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
i);
i--;
}
BT_LOGD("Destroying muxer component's message iterator: "
"muxer-msg-iter-addr=%p", muxer_msg_iter);
- if (muxer_msg_iter->muxer_upstream_msg_iters) {
- BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers.");
+ if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
+ BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
+ g_ptr_array_free(
+ muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
+ }
+
+ if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+ BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
g_ptr_array_free(
- muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
+ muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
}
g_free(muxer_msg_iter);
}
muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
- muxer_msg_iter->muxer_upstream_msg_iters =
+ muxer_msg_iter->active_muxer_upstream_msg_iters =
g_ptr_array_new_with_free_func(
(GDestroyNotify) destroy_muxer_upstream_msg_iter);
- if (!muxer_msg_iter->muxer_upstream_msg_iters) {
+ if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
+ BT_LOGE_STR("Failed to allocate a GPtrArray.");
+ goto error;
+ }
+
+ muxer_msg_iter->ended_muxer_upstream_msg_iters =
+ g_ptr_array_new_with_free_func(
+ (GDestroyNotify) destroy_muxer_upstream_msg_iter);
+ if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
BT_LOGE_STR("Failed to allocate a GPtrArray.");
goto error;
}
}
BT_HIDDEN
-void muxer_msg_iter_finalize(
- bt_self_message_iterator *self_msg_iter)
+void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
{
struct muxer_msg_iter *muxer_msg_iter =
bt_self_message_iterator_get_data(self_msg_iter);
return status;
}
-BT_HIDDEN
-bt_bool muxer_msg_iter_can_seek_beginning(
- bt_self_message_iterator *self_msg_iter)
+static inline
+bt_bool muxer_upstream_msg_iters_can_all_seek_beginning(
+ GPtrArray *muxer_upstream_msg_iters)
{
- struct muxer_msg_iter *muxer_msg_iter =
- bt_self_message_iterator_get_data(self_msg_iter);
uint64_t i;
bt_bool ret = BT_TRUE;
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
struct muxer_upstream_msg_iter *upstream_msg_iter =
- muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+ muxer_upstream_msg_iters->pdata[i];
if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
upstream_msg_iter->msg_iter)) {
return ret;
}
+BT_HIDDEN
+bt_bool muxer_msg_iter_can_seek_beginning(
+ bt_self_message_iterator *self_msg_iter)
+{
+ struct muxer_msg_iter *muxer_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ bt_bool ret = BT_TRUE;
+
+ if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+ muxer_msg_iter->active_muxer_upstream_msg_iters)) {
+ ret = BT_FALSE;
+ goto end;
+ }
+
+ if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+ muxer_msg_iter->ended_muxer_upstream_msg_iters)) {
+ ret = BT_FALSE;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
BT_HIDDEN
bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
bt_self_message_iterator *self_msg_iter)
int status;
uint64_t i;
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ /* Seek all ended upstream iterators first */
+ for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+ i++) {
struct muxer_upstream_msg_iter *upstream_msg_iter =
- muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
status = bt_self_component_port_input_message_iterator_seek_beginning(
upstream_msg_iter->msg_iter);
if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
goto end;
}
+
+ empty_message_queue(upstream_msg_iter);
+ }
+
+ /* Seek all previously active upstream iterators */
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
+
+ status = bt_self_component_port_input_message_iterator_seek_beginning(
+ upstream_msg_iter->msg_iter);
+ if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ empty_message_queue(upstream_msg_iter);
+ }
+
+ /* Make them all active */
+ for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+ i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
+
+ g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
+ upstream_msg_iter);
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
}
+ g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
+ 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+ muxer_msg_iter->clock_class_expectation =
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
end:
return status;