STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
STATE_AFTER_STREAM_PACKET_CONTEXT,
- STATE_EMIT_MSG_NEW_STREAM,
- STATE_EMIT_MSG_NEW_PACKET,
+ STATE_EMIT_MSG_STREAM_BEGINNING,
+ STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING,
+ STATE_EMIT_MSG_PACKET_BEGINNING,
STATE_DSCOPE_EVENT_HEADER_BEGIN,
STATE_DSCOPE_EVENT_HEADER_CONTINUE,
STATE_AFTER_EVENT_HEADER,
STATE_DSCOPE_EVENT_PAYLOAD_BEGIN,
STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE,
STATE_EMIT_MSG_EVENT,
- STATE_EMIT_MSG_END_OF_PACKET,
- STATE_DONE,
STATE_SKIP_PACKET_PADDING,
+ STATE_EMIT_MSG_PACKET_END_MULTI,
+ STATE_EMIT_MSG_PACKET_END_SINGLE,
+ STATE_EMIT_MSG_STREAM_ACTIVITY_END,
+ STATE_EMIT_MSG_STREAM_END,
+ STATE_DONE,
};
/* CTF message iterator */
/* Current message iterator to create messages (weak) */
bt_self_message_iterator *msg_iter;
+ /*
+ * True to emit stream beginning and stream activity beginning
+ * messages.
+ */
+ bool emit_stream_begin_msg;
+
+ /*
+ * True to emit stream end and stream activity end messages.
+ */
+ bool emit_stream_end_msg;
+
/*
* Current dynamic scope field pointer.
*
void *data;
} medium;
- /* Stream beginning was emitted */
- bool stream_begin_emitted;
-
/* Current packet size (bits) (-1 if unknown) */
int64_t cur_exp_packet_total_size;
return "STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE";
case STATE_AFTER_STREAM_PACKET_CONTEXT:
return "STATE_AFTER_STREAM_PACKET_CONTEXT";
- case STATE_EMIT_MSG_NEW_PACKET:
- return "STATE_EMIT_MSG_NEW_PACKET";
- case STATE_EMIT_MSG_NEW_STREAM:
- return "STATE_EMIT_MSG_NEW_STREAM";
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ return "STATE_EMIT_MSG_STREAM_BEGINNING";
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING";
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
+ return "STATE_EMIT_MSG_PACKET_BEGINNING";
case STATE_DSCOPE_EVENT_HEADER_BEGIN:
return "STATE_DSCOPE_EVENT_HEADER_BEGIN";
case STATE_DSCOPE_EVENT_HEADER_CONTINUE:
return "STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE";
case STATE_EMIT_MSG_EVENT:
return "STATE_EMIT_MSG_EVENT";
- case STATE_EMIT_MSG_END_OF_PACKET:
- return "STATE_EMIT_MSG_END_OF_PACKET";
- case STATE_DONE:
- return "STATE_DONE";
case STATE_SKIP_PACKET_PADDING:
return "STATE_SKIP_PACKET_PADDING";
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
+ return "STATE_EMIT_MSG_PACKET_END_MULTI";
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ return "STATE_EMIT_MSG_PACKET_END_SINGLE";
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ return "STATE_EMIT_MSG_STREAM_ACTIVITY_END";
+ case STATE_EMIT_MSG_STREAM_END:
+ return "STATE_EMIT_MSG_STREAM_END";
+ case STATE_DONE:
+ return "STATE_DONE";
default:
return "(unknown)";
}
goto end;
}
+ /*
+ * Make sure at least one bit is available for this packet. An
+ * empty packet is impossible. If we reach the end of the medium
+ * at this point, then it's considered the end of the stream.
+ */
+ ret = buf_ensure_available_bits(notit);
+ switch (ret) {
+ case BT_MSG_ITER_STATUS_OK:
+ break;
+ case BT_MSG_ITER_STATUS_EOF:
+ ret = BT_MSG_ITER_STATUS_OK;
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+ goto end;
+ default:
+ goto end;
+ }
+
/* Packet header class is common to the whole trace class. */
packet_header_fc = notit->meta.tc->packet_header_fc;
if (!packet_header_fc) {
"notit-addr=%p, packet-size=%" PRIu64 ", content-size=%" PRIu64,
notit, notit->cur_exp_packet_total_size,
notit->cur_exp_packet_content_size);
+
end:
return status;
}
static
-enum bt_msg_iter_status after_packet_context_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status;
goto end;
}
- if (notit->stream_begin_emitted) {
- notit->state = STATE_EMIT_MSG_NEW_PACKET;
+ if (notit->stream) {
+ /*
+ * Stream exists, which means we already emitted at
+ * least one packet beginning message, so the initial
+ * stream beginning message was also emitted.
+ */
+ notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
} else {
- notit->state = STATE_EMIT_MSG_NEW_STREAM;
+ notit->state = STATE_EMIT_MSG_STREAM_BEGINNING;
}
end:
}
static
-enum bt_msg_iter_status read_event_header_begin_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status read_event_header_begin_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
struct ctf_field_class *event_header_fc = NULL;
/* No more events! */
BT_LOGV("Reached end of packet: notit-addr=%p, "
"cur=%zu", notit, packet_at(notit));
- notit->state = STATE_EMIT_MSG_END_OF_PACKET;
+ notit->state = STATE_EMIT_MSG_PACKET_END_MULTI;
goto end;
} else if (unlikely(packet_at(notit) >
notit->cur_exp_packet_content_size)) {
* nothing else for us.
*/
status = buf_ensure_available_bits(notit);
- if (status != BT_MSG_ITER_STATUS_OK) {
- /*
- * If this function returns
- * `BT_MSG_ITER_STATUS_EOF`:
- *
- * 1. bt_msg_iter_get_next_message()
- * emits a "packet end" message. This
- * resets the current packet. The state
- * remains unchanged otherwise.
- * 2. This function is called again. It returns
- * `BT_MSG_ITER_STATUS_EOF` again.
- * 3. bt_msg_iter_get_next_message()
- * emits a "stream end" message because
- * there's no current packet. It sets the
- * current state to `STATE_DONE`.
- */
+ switch (status) {
+ case BT_MSG_ITER_STATUS_OK:
+ break;
+ case BT_MSG_ITER_STATUS_EOF:
+ status = BT_MSG_ITER_STATUS_OK;
+ notit->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
+ goto end;
+ default:
goto end;
}
}
}
static
-enum bt_msg_iter_status skip_packet_padding_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
size_t bits_to_skip;
case STATE_AFTER_STREAM_PACKET_CONTEXT:
status = after_packet_context_state(notit);
break;
- case STATE_EMIT_MSG_NEW_STREAM:
- notit->state = STATE_EMIT_MSG_NEW_PACKET;
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING;
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
break;
- case STATE_EMIT_MSG_NEW_PACKET:
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
notit->state = STATE_DSCOPE_EVENT_HEADER_BEGIN;
break;
case STATE_DSCOPE_EVENT_HEADER_BEGIN:
case STATE_SKIP_PACKET_PADDING:
status = skip_packet_padding_state(notit);
break;
- case STATE_EMIT_MSG_END_OF_PACKET:
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
notit->state = STATE_SKIP_PACKET_PADDING;
break;
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ notit->state = STATE_EMIT_MSG_STREAM_END;
+ break;
+ case STATE_EMIT_MSG_STREAM_END:
+ notit->state = STATE_DONE;
+ break;
+ case STATE_DONE:
+ break;
default:
BT_LOGD("Unknown CTF plugin message iterator state: "
"notit-addr=%p, state=%d", notit, notit->state);
notit->cur_stream_class_id = -1;
notit->cur_event_class_id = -1;
notit->cur_data_stream_id = -1;
- notit->stream_begin_emitted = false;
+ notit->emit_stream_begin_msg = true;
+ notit->emit_stream_end_msg = true;
}
static
static
void set_event_default_clock_snapshot(struct bt_msg_iter *notit)
{
- bt_event *event =
- bt_message_event_borrow_event(
- notit->event_msg);
+ bt_event *event = bt_message_event_borrow_event(notit->event_msg);
bt_stream_class *sc = notit->meta.sc->ir_sc;
BT_ASSERT(event);
}
static
-void notify_new_stream(struct bt_msg_iter *notit,
+void create_msg_stream_beginning(struct bt_msg_iter *notit,
bt_message **message)
{
- enum bt_msg_iter_status status;
bt_message *ret = NULL;
- status = set_current_stream(notit);
- if (status != BT_MSG_ITER_STATUS_OK) {
- BT_MESSAGE_PUT_REF_AND_RESET(ret);
- goto end;
- }
-
BT_ASSERT(notit->stream);
BT_ASSERT(notit->msg_iter);
ret = bt_message_stream_beginning_create(notit->msg_iter,
return;
}
-end:
*message = ret;
}
static
-void notify_end_of_stream(struct bt_msg_iter *notit,
+void create_msg_stream_activity_beginning(struct bt_msg_iter *notit,
+ bt_message **message)
+{
+ bt_message *ret = NULL;
+
+ BT_ASSERT(notit->stream);
+ BT_ASSERT(notit->msg_iter);
+ ret = bt_message_stream_activity_beginning_create(notit->msg_iter,
+ notit->stream);
+ if (!ret) {
+ BT_LOGE("Cannot create stream activity beginning message: "
+ "notit-addr=%p, stream-addr=%p",
+ notit, notit->stream);
+ return;
+ }
+
+ *message = ret;
+}
+
+static
+void create_msg_stream_activity_end(struct bt_msg_iter *notit,
bt_message **message)
+{
+ bt_message *ret = NULL;
+
+ if (!notit->stream) {
+ BT_LOGE("Cannot create stream for stream message: "
+ "notit-addr=%p", notit);
+ return;
+ }
+
+ BT_ASSERT(notit->stream);
+ BT_ASSERT(notit->msg_iter);
+ ret = bt_message_stream_activity_end_create(notit->msg_iter,
+ notit->stream);
+ if (!ret) {
+ BT_LOGE("Cannot create stream activity end message: "
+ "notit-addr=%p, stream-addr=%p",
+ notit, notit->stream);
+ return;
+ }
+
+ *message = ret;
+}
+
+static
+void create_msg_stream_end(struct bt_msg_iter *notit, bt_message **message)
{
bt_message *ret;
ret = bt_message_stream_end_create(notit->msg_iter,
notit->stream);
if (!ret) {
- BT_LOGE("Cannot create stream beginning message: "
+ BT_LOGE("Cannot create stream end message: "
"notit-addr=%p, stream-addr=%p",
notit, notit->stream);
return;
}
+
*message = ret;
}
static
-void notify_new_packet(struct bt_msg_iter *notit,
+void create_msg_packet_beginning(struct bt_msg_iter *notit,
bt_message **message)
{
int ret;
}
static
-void notify_end_of_packet(struct bt_msg_iter *notit,
- bt_message **message)
+void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message)
{
bt_message *msg;
enum bt_msg_iter_status bt_msg_iter_get_next_message(
struct bt_msg_iter *notit,
- bt_self_message_iterator *msg_iter,
- bt_message **message)
+ bt_self_message_iterator *msg_iter, bt_message **message)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
BT_ASSERT(notit);
BT_ASSERT(message);
-
- if (notit->state == STATE_DONE) {
- status = BT_MSG_ITER_STATUS_EOF;
- goto end;
- }
-
notit->msg_iter = msg_iter;
-
BT_LOGV("Getting next message: notit-addr=%p", notit);
while (true) {
status = handle_state(notit);
- if (status == BT_MSG_ITER_STATUS_AGAIN) {
+ if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
goto end;
- }
-
- if (status != BT_MSG_ITER_STATUS_OK) {
- if (status == BT_MSG_ITER_STATUS_EOF) {
- enum state next_state = notit->state;
-
- BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
-
- if (notit->packet) {
- notify_end_of_packet(notit,
- message);
- } else {
- notify_end_of_stream(notit,
- message);
- next_state = STATE_DONE;
- }
-
- if (!*message) {
- status = BT_MSG_ITER_STATUS_ERROR;
- goto end;
- }
-
- status = BT_MSG_ITER_STATUS_OK;
- notit->state = next_state;
- } else {
- BT_LOGW("Cannot handle state: "
- "notit-addr=%p, state=%s",
- notit, state_string(notit->state));
- }
-
+ } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+ BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+ notit, state_string(notit->state));
goto end;
}
switch (notit->state) {
- case STATE_EMIT_MSG_NEW_STREAM:
- /* notify_new_stream() logs errors */
- notify_new_stream(notit, message);
+ case STATE_EMIT_MSG_EVENT:
+ BT_ASSERT(notit->event_msg);
+ set_event_default_clock_snapshot(notit);
+ *message = notit->event_msg;
+ notit->event_msg = NULL;
+ goto end;
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
+ /* create_msg_packet_beginning() logs errors */
+ create_msg_packet_beginning(notit, message);
if (!*message) {
status = BT_MSG_ITER_STATUS_ERROR;
}
- notit->stream_begin_emitted = true;
goto end;
- case STATE_EMIT_MSG_NEW_PACKET:
- /* notify_new_packet() logs errors */
- notify_new_packet(notit, message);
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
+ /* create_msg_packet_end() logs errors */
+ create_msg_packet_end(notit, message);
if (!*message) {
status = BT_MSG_ITER_STATUS_ERROR;
}
goto end;
- case STATE_EMIT_MSG_EVENT:
- BT_ASSERT(notit->event_msg);
- set_event_default_clock_snapshot(notit);
- *message = notit->event_msg;
- notit->event_msg = NULL;
- goto end;
- case STATE_EMIT_MSG_END_OF_PACKET:
- /* notify_end_of_packet() logs errors */
- notify_end_of_packet(notit, message);
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ if (notit->emit_stream_begin_msg) {
+ /* create_msg_stream_activity_beginning() logs errors */
+ create_msg_stream_activity_beginning(notit, message);
- if (!*message) {
- status = BT_MSG_ITER_STATUS_ERROR;
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
}
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ if (notit->emit_stream_end_msg) {
+ /* create_msg_stream_activity_end() logs errors */
+ create_msg_stream_activity_end(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ status = set_current_stream(notit);
+ if (status != BT_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+
+ if (notit->emit_stream_begin_msg) {
+ /* create_msg_stream_beginning() logs errors */
+ create_msg_stream_beginning(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_EMIT_MSG_STREAM_END:
+ if (notit->emit_stream_end_msg) {
+ /* create_msg_stream_end() logs errors */
+ create_msg_stream_end(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_DONE:
+ status = BT_MSG_ITER_STATUS_EOF;
goto end;
default:
/* Non-emitting state: continue */
BT_ASSERT(notit);
- if (notit->state == STATE_EMIT_MSG_NEW_PACKET) {
+ if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) {
/* We're already there */
goto end;
}
while (true) {
status = handle_state(notit);
- if (status == BT_MSG_ITER_STATUS_AGAIN) {
+ if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
goto end;
- }
- if (status != BT_MSG_ITER_STATUS_OK) {
- if (status == BT_MSG_ITER_STATUS_EOF) {
- BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
- } else {
- BT_LOGW("Cannot handle state: "
- "notit-addr=%p, state=%s",
- notit, state_string(notit->state));
- }
+ } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+ BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+ notit, state_string(notit->state));
goto end;
}
switch (notit->state) {
- case STATE_EMIT_MSG_NEW_PACKET:
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
/*
* Packet header and context fields are
* potentially decoded (or they don't exist).
*/
goto end;
case STATE_INIT:
- case STATE_EMIT_MSG_NEW_STREAM:
case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE:
case STATE_AFTER_TRACE_PACKET_HEADER:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
case STATE_AFTER_STREAM_PACKET_CONTEXT:
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
/* Non-emitting state: continue */
break;
default:
/*
* We should never get past the
- * STATE_EMIT_MSG_NEW_PACKET state.
+ * STATE_EMIT_MSG_PACKET_BEGINNING state.
*/
BT_LOGF("Unexpected state: notit-addr=%p, state=%s",
notit, state_string(notit->state));
}
BT_HIDDEN
-enum bt_msg_iter_status bt_msg_iter_seek(
- struct bt_msg_iter *notit, off_t offset)
+enum bt_msg_iter_status bt_msg_iter_seek(struct bt_msg_iter *notit,
+ off_t offset)
{
enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK;
enum bt_msg_iter_medium_status medium_status;
return ret;
}
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_offset(struct bt_msg_iter *notit)
-{
- BT_ASSERT(notit);
- return notit->cur_packet_offset;
-}
-
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_size(
- struct bt_msg_iter *notit)
-{
- BT_ASSERT(notit);
- return notit->cur_exp_packet_total_size;
-}
-
-BT_HIDDEN
-void bt_msg_trace_class_changed(struct bt_msg_iter *notit)
-{
- if (notit->meta.tc->stored_value_count > notit->stored_values->len) {
- g_array_set_size(notit->stored_values,
- notit->meta.tc->stored_value_count);
- }
-}
-
BT_HIDDEN
enum bt_msg_iter_status bt_msg_iter_get_packet_properties(
struct bt_msg_iter *notit,
goto end;
}
- props->exp_packet_total_size =
- (uint64_t) notit->cur_exp_packet_total_size;
- props->exp_packet_content_size =
- (uint64_t) notit->cur_exp_packet_content_size;
+ props->exp_packet_total_size = notit->cur_exp_packet_total_size;
+ props->exp_packet_content_size = notit->cur_exp_packet_content_size;
BT_ASSERT(props->stream_class_id >= 0);
props->stream_class_id = (uint64_t) notit->cur_stream_class_id;
props->data_stream_id = notit->cur_data_stream_id;
end:
return status;
}
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit,
+ bool val)
+{
+ notit->emit_stream_begin_msg = val;
+}
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit,
+ bool val)
+{
+ notit->emit_stream_end_msg = val;
+}
}
static
-bt_stream *medop_borrow_stream(
- bt_stream_class *stream_class, int64_t stream_id,
+bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id,
void *data)
{
struct ctf_fs_ds_file *ds_file = data;
}
static
-enum bt_msg_iter_medium_status medop_seek(
- enum bt_msg_iter_seek_whence whence, off_t offset,
- void *data)
+enum bt_msg_iter_medium_status medop_seek(enum bt_msg_iter_seek_whence whence,
+ off_t offset, void *data)
{
enum bt_msg_iter_medium_status ret =
BT_MSG_ITER_MEDIUM_STATUS_OK;
int ret;
struct ctf_fs_ds_index *index = NULL;
enum bt_msg_iter_status iter_status;
+ off_t current_packet_offset_bytes = 0;
BT_LOGD("Indexing stream file %s", ds_file->file->path->str);
}
do {
- off_t current_packet_offset;
- off_t next_packet_offset;
off_t current_packet_size_bytes;
struct ctf_fs_ds_index_entry *entry;
struct bt_msg_iter_packet_properties props;
- iter_status = bt_msg_iter_get_packet_properties(
- ds_file->msg_iter, &props);
+ if (current_packet_offset_bytes < 0) {
+ BT_LOGE_STR("Cannot get the current packet's offset.");
+ goto error;
+ } else if (current_packet_offset_bytes > ds_file->file->size) {
+ BT_LOGE_STR("Unexpected current packet's offset (larger than file).");
+ goto error;
+ } else if (current_packet_offset_bytes == ds_file->file->size) {
+ /* No more data */
+ break;
+ }
+
+ iter_status = bt_msg_iter_seek(ds_file->msg_iter,
+ current_packet_offset_bytes);
if (iter_status != BT_MSG_ITER_STATUS_OK) {
- if (iter_status == BT_MSG_ITER_STATUS_EOF) {
- break;
- }
goto error;
}
- current_packet_offset =
- bt_msg_iter_get_current_packet_offset(
- ds_file->msg_iter);
- if (current_packet_offset < 0) {
- BT_LOGE_STR("Cannot get the current packet's offset.");
+ iter_status = bt_msg_iter_get_packet_properties(
+ ds_file->msg_iter, &props);
+ if (iter_status != BT_MSG_ITER_STATUS_OK) {
goto error;
}
- current_packet_size_bytes =
- ((props.exp_packet_total_size + 7) & ~7) / CHAR_BIT;
+ if (props.exp_packet_total_size >= 0) {
+ current_packet_size_bytes =
+ (uint64_t) props.exp_packet_total_size / 8;
+ } else {
+ current_packet_size_bytes = ds_file->file->size;
+ }
- if (current_packet_offset + current_packet_size_bytes >
+ if (current_packet_offset_bytes + current_packet_size_bytes >
ds_file->file->size) {
BT_LOGW("Invalid packet size reported in file: stream=\"%s\", "
"packet-offset=%jd, packet-size-bytes=%jd, "
"file-size=%jd",
ds_file->file->path->str,
- current_packet_offset,
+ current_packet_offset_bytes,
current_packet_size_bytes,
ds_file->file->size);
goto error;
}
- next_packet_offset = current_packet_offset +
- current_packet_size_bytes;
+ current_packet_offset_bytes += current_packet_size_bytes;
BT_LOGD("Seeking to next packet: current-packet-offset=%jd, "
- "next-packet-offset=%jd", current_packet_offset,
- next_packet_offset);
-
+ "next-packet-offset=%jd",
+ current_packet_offset_bytes - current_packet_size_bytes,
+ current_packet_offset_bytes);
entry = ctf_fs_ds_index_add_new_entry(index);
if (!entry) {
BT_LOGE_STR("Failed to allocate a new index entry.");
}
ret = init_index_entry(entry, ds_file, &props,
- current_packet_size_bytes, current_packet_offset);
+ current_packet_size_bytes, current_packet_offset_bytes);
if (ret) {
goto error;
}
-
- iter_status = bt_msg_iter_seek(ds_file->msg_iter,
- next_packet_offset);
} while (iter_status == BT_MSG_ITER_STATUS_OK);
if (iter_status != BT_MSG_ITER_STATUS_EOF) {
g_free(msg_iter_data);
}
+static
+void set_msg_iter_emits_stream_beginning_end_messages(
+ struct ctf_fs_msg_iter_data *msg_iter_data)
+{
+ bt_msg_iter_set_emit_stream_beginning_message(
+ msg_iter_data->ds_file->msg_iter,
+ msg_iter_data->ds_file_info_index == 0);
+ bt_msg_iter_set_emit_stream_end_message(
+ msg_iter_data->ds_file->msg_iter,
+ msg_iter_data->ds_file_info_index ==
+ msg_iter_data->ds_file_group->ds_file_infos->len - 1);
+}
+
static
bt_self_message_iterator_status ctf_fs_iterator_next_one(
struct ctf_fs_msg_iter_data *msg_iter_data,
- const bt_message **msg)
+ const bt_message **out_msg)
{
bt_self_message_iterator_status status;
- bt_message *priv_msg;
- int ret;
BT_ASSERT(msg_iter_data->ds_file);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
- *msg = priv_msg;
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
- bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_BEGINNING) {
- if (msg_iter_data->skip_stream_begin_msgs) {
- /*
- * We already emitted a
- * BT_MESSAGE_TYPE_STREAM_BEGINNING
- * message: skip this one, get a new one.
- */
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
- &priv_msg);
- *msg = priv_msg;
- BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
- goto end;
- } else {
- /*
- * First BT_MESSAGE_TYPE_STREAM_BEGINNING
- * message: skip all following.
- */
- msg_iter_data->skip_stream_begin_msgs = true;
+ while (true) {
+ bt_message *msg;
+
+ status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &msg);
+ switch (status) {
+ case BT_SELF_MESSAGE_ITERATOR_STATUS_OK:
+ *out_msg = msg;
+ msg = NULL;
goto end;
- }
- }
+ case BT_SELF_MESSAGE_ITERATOR_STATUS_END:
+ {
+ int ret;
+
+ if (msg_iter_data->ds_file_info_index ==
+ msg_iter_data->ds_file_group->ds_file_infos->len - 1) {
+ /* End of all group's stream files */
+ goto end;
+ }
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
- bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_END) {
- msg_iter_data->ds_file_info_index++;
+ msg_iter_data->ds_file_info_index++;
+ bt_msg_iter_reset(msg_iter_data->msg_iter);
+ set_msg_iter_emits_stream_beginning_end_messages(
+ msg_iter_data);
- if (msg_iter_data->ds_file_info_index ==
- msg_iter_data->ds_file_group->ds_file_infos->len) {
/*
- * No more stream files to read: we reached the
- * real end. Emit this
- * BT_MESSAGE_TYPE_STREAM_END message.
- * The next time ctf_fs_iterator_next() is
- * called for this message iterator,
- * ctf_fs_ds_file_next() will return
- * BT_SELF_MESSAGE_ITERATOR_STATUS_END().
+ * Open and start reading the next stream file
+ * within our stream file group.
*/
- goto end;
- }
+ ret = msg_iter_data_set_current_ds_file(msg_iter_data);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- bt_msg_iter_reset(msg_iter_data->msg_iter);
-
- /*
- * Open and start reading the next stream file within
- * our stream file group.
- */
- ret = msg_iter_data_set_current_ds_file(msg_iter_data);
- if (ret) {
- status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
- goto end;
+ /* Continue the loop to get the next message */
+ break;
}
-
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
- *msg = priv_msg;
-
- /*
- * If we get a message, we expect to get a
- * BT_MESSAGE_TYPE_STREAM_BEGINNING message
- * because the iterator's state machine emits one before
- * even requesting the first block of data from the
- * medium. Skip this message because we're not
- * really starting a new stream here, and try getting a
- * new message (which, if it works, is a
- * BT_MESSAGE_TYPE_PACKET_BEGINNING one). We're sure to
- * get at least one pair of
- * BT_MESSAGE_TYPE_PACKET_BEGINNING and
- * BT_MESSAGE_TYPE_PACKET_END messages in the
- * case of a single, empty packet. We know there's at
- * least one packet because the stream file group does
- * not contain empty stream files.
- */
- BT_ASSERT(msg_iter_data->skip_stream_begin_msgs);
-
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
- BT_ASSERT(bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_BEGINNING);
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
- &priv_msg);
- *msg = priv_msg;
- BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
+ default:
+ goto end;
}
}
goto error;
}
+ set_msg_iter_emits_stream_beginning_end_messages(msg_iter_data);
bt_self_message_iterator_set_data(self_msg_iter,
msg_iter_data);
if (ret != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {