ctx->session->streams[i].id = be64toh(stream.id);
ctx->session->streams[i].session = ctx->session;
- ctx->session->streams[i].first_read = 1;
ctx->session->streams[i].mmap_size = 0;
+ ctx->session->streams[i].ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
ctx->session->streams[i].metadata_flag = 1;
return ret;
}
+/*
+ * Assign the fields from a lttng_viewer_index to a packet_index.
+ */
+static
+void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
+ struct packet_index *pindex)
+{
+ assert(lindex);
+ assert(pindex);
+
+ pindex->offset = be64toh(lindex->offset);
+ pindex->packet_size = be64toh(lindex->packet_size);
+ pindex->content_size = be64toh(lindex->content_size);
+ pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
+ pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
+ pindex->events_discarded = be64toh(lindex->events_discarded);
+}
+
/*
* Get one index for a stream.
*
static
int get_next_index(struct lttng_live_ctx *ctx,
struct lttng_live_viewer_stream *viewer_stream,
- struct packet_index *index)
+ struct packet_index *index, uint64_t *stream_id)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_next_index rq;
- struct lttng_viewer_index rp;
int ret;
ssize_t ret_len;
+ struct lttng_viewer_index *rp = &viewer_stream->current_index;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = sizeof(rq);
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
goto error;
perror("[error] Error receiving index response");
goto error;
}
- assert(ret_len == sizeof(rp));
+ assert(ret_len == sizeof(*rp));
- rp.flags = be32toh(rp.flags);
+ rp->flags = be32toh(rp->flags);
- switch (be32toh(rp.status)) {
+ switch (be32toh(rp->status)) {
case LTTNG_VIEWER_INDEX_INACTIVE:
printf_verbose("get_next_index: inactive\n");
memset(index, 0, sizeof(struct packet_index));
- index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
+ index->ts_cycles.timestamp_end = be64toh(rp->timestamp_end);
+ *stream_id = be64toh(rp->stream_id);
break;
case LTTNG_VIEWER_INDEX_OK:
printf_verbose("get_next_index: Ok, need metadata update : %u\n",
- rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
- index->offset = be64toh(rp.offset);
- index->packet_size = be64toh(rp.packet_size);
- index->content_size = be64toh(rp.content_size);
- index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin);
- index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
- index->events_discarded = be64toh(rp.events_discarded);
+ rp->flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
+ lttng_index_to_packet_index(rp, index);
+ *stream_id = be64toh(rp->stream_id);
+ viewer_stream->data_pending = 1;
- if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
+ if (rp->flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
ret = append_metadata(ctx, viewer_stream);
if (ret)
goto error;
}
- if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ if (rp->flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
printf_verbose("get_next_index: need new streams\n");
ret = ask_new_streams(ctx);
if (ret < 0)
return -1;
}
+static
+void read_packet_header(struct ctf_stream_pos *pos,
+ struct ctf_file_stream *file_stream)
+{
+ int ret;
+
+ /* update trace_packet_header and stream_packet_context */
+ if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
+ /* Read packet header */
+ ret = generic_rw(&pos->parent,
+ &file_stream->parent.trace_packet_header->p);
+ if (ret) {
+ pos->offset = EOF;
+ fprintf(stderr, "[error] trace packet "
+ "header read failed\n");
+ goto end;
+ }
+ }
+ if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
+ /* Read packet context */
+ ret = generic_rw(&pos->parent,
+ &file_stream->parent.stream_packet_context->p);
+ if (ret) {
+ pos->offset = EOF;
+ fprintf(stderr, "[error] stream packet "
+ "context read failed\n");
+ goto end;
+ }
+ }
+ pos->data_offset = pos->offset;
+
+end:
+ return;
+}
+
+/*
+ * Handle the seek parameters.
+ * Returns 0 if the packet_seek can continue, a positive value to
+ * cleanly exit the packet_seek, a negative value on error.
+ */
+static
+int handle_seek_position(size_t index, int whence,
+ struct lttng_live_viewer_stream *viewer_stream,
+ struct ctf_stream_pos *pos,
+ struct ctf_file_stream *file_stream)
+{
+ int ret;
+
+ switch (whence) {
+ case SEEK_CUR:
+ ret = 0;
+ goto end;
+ case SEEK_SET:
+ /*
+ * We only allow to seek to 0.
+ */
+ if (index != 0) {
+ fprintf(stderr, "[error] Arbitrary seek in lttng-live "
+ "trace not supported\n");
+ pos->offset = EOF;
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ goto end;
+
+ default:
+ fprintf(stderr, "[error] Invalid seek parameter\n");
+ assert(0);
+ }
+
+end:
+ return ret;
+}
+
static
void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
int whence)
struct packet_index *prev_index = NULL, *cur_index;
struct lttng_live_viewer_stream *viewer_stream;
struct lttng_live_session *session;
+ uint64_t stream_id = -1ULL;
int ret;
-retry:
pos = ctf_pos(stream_pos);
file_stream = container_of(pos, struct ctf_file_stream, pos);
viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
session = viewer_stream->session;
+ ret = handle_seek_position(index, whence, viewer_stream, pos,
+ file_stream);
+ if (ret != 0) {
+ return;
+ }
+
+retry:
switch (pos->packet_index->len) {
case 0:
g_array_set_size(pos->packet_index, 1);
abort();
break;
}
- printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
- ret = get_next_index(session->ctx, viewer_stream, cur_index);
- if (ret < 0) {
- pos->offset = EOF;
- if (!lttng_live_should_quit()) {
- fprintf(stderr, "[error] get_next_index failed\n");
+
+ if (viewer_stream->data_pending) {
+ lttng_index_to_packet_index(&viewer_stream->current_index, cur_index);
+ } else {
+ printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
+ ret = get_next_index(session->ctx, viewer_stream, cur_index, &stream_id);
+ if (ret < 0) {
+ pos->offset = EOF;
+ if (!lttng_live_should_quit()) {
+ fprintf(stderr, "[error] get_next_index failed\n");
+ }
+ return;
}
+ printf_verbose("Index received : packet_size : %" PRIu64
+ ", offset %" PRIu64 ", content_size %" PRIu64
+ ", timestamp_end : %" PRIu64 "\n",
+ cur_index->packet_size, cur_index->offset,
+ cur_index->content_size,
+ cur_index->ts_cycles.timestamp_end);
+
+ }
+
+ /*
+ * On the first time we receive an index, the stream_id needs to
+ * be set for the stream in order to use it, we don't want any
+ * data at this stage.
+ */
+ if (file_stream->parent.stream_id == -1ULL) {
+ /*
+ * Warning: with lttng-tools < 2.4.2, the beacon does not
+ * contain the real stream ID, it is memset to 0, so this
+ * might create a problem when a session has multiple
+ * channels. We can't detect it at this stage, lttng-tools
+ * has to be upgraded to fix this problem.
+ */
+ printf_verbose("Assigning stream_id %" PRIu64 "\n",
+ stream_id);
+ file_stream->parent.stream_id = stream_id;
+ viewer_stream->ctf_stream_id = stream_id;
+
return;
}
}
if (cur_index->content_size == 0) {
- file_stream->parent.cycles_timestamp =
+ if (file_stream->parent.stream_class) {
+ file_stream->parent.cycles_timestamp =
cur_index->ts_cycles.timestamp_end;
- file_stream->parent.real_timestamp = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_end);
+ file_stream->parent.real_timestamp = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
+ }
} else {
- /* Convert the timestamps and append to the real_index. */
- cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_begin);
- cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
- &file_stream->parent,
- cur_index->ts_cycles.timestamp_end);
+ if (file_stream->parent.stream_class) {
+ /* Convert the timestamps and append to the real_index. */
+ cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_begin);
+ cur_index->ts_real.timestamp_end = ctf_get_real_timestamp(
+ &file_stream->parent,
+ cur_index->ts_cycles.timestamp_end);
+ }
ctf_update_current_packet_index(&file_stream->parent,
prev_index, cur_index);
}
return;
}
+ viewer_stream->data_pending = 0;
- printf_verbose("Index received : packet_size : %" PRIu64
- ", offset %" PRIu64 ", content_size %" PRIu64
- ", timestamp_end : %" PRIu64 "\n",
- cur_index->packet_size, cur_index->offset,
- cur_index->content_size,
- cur_index->ts_cycles.timestamp_end);
-
- /* update trace_packet_header and stream_packet_context */
- if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
- /* Read packet header */
- ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
- if (ret) {
- pos->offset = EOF;
- fprintf(stderr, "[error] trace packet header read failed\n");
- goto end;
- }
- }
- if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
- /* Read packet context */
- ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
- if (ret) {
- pos->offset = EOF;
- fprintf(stderr, "[error] stream packet context read failed\n");
- goto end;
- }
- }
- pos->data_offset = pos->offset;
+ read_packet_header(pos, file_stream);
end:
return;
ctx->session->streams[i].id = be64toh(stream.id);
ctx->session->streams[i].session = ctx->session;
- ctx->session->streams[i].first_read = 1;
ctx->session->streams[i].mmap_size = 0;
+ ctx->session->streams[i].ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
ctx->session->streams[i].metadata_flag = 1;