ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n",
+ be32toh(rp.streams_count));
for (i = 0; i < be32toh(rp.streams_count); i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = 0;
end:
goto retry;
case LTTNG_VIEWER_INDEX_HUP:
printf_verbose("get_next_index: stream hung up\n");
+ /* TODO: remove stream from session list and trace ptr array */
viewer_stream->id = -1ULL;
index->offset = EOF;
ctx->session->stream_count--;
ret = 0;
goto end;
}
- printf_verbose("Waiting for %" PRIu64 " streams:\n",
- ctx->session->stream_count);
- ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
- ctx->session->stream_count);
+ printf_verbose("Waiting for %d streams:\n", stream_count);
+
for (i = 0; i < stream_count; i++) {
- ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream));
+ struct lttng_live_viewer_stream *lvstream;
+
+ lvstream = g_new0(struct lttng_live_viewer_stream, 1);
+ ret_len = lttng_live_recv(ctx->control_sock, &stream,
+ sizeof(stream));
if (ret_len == 0) {
fprintf(stderr, "[error] Remote side has closed connection\n");
+ g_free(lvstream);
goto error;
}
if (ret_len < 0) {
perror("[error] Error receiving stream");
+ g_free(lvstream);
goto error;
}
assert(ret_len == sizeof(stream));
printf_verbose(" stream %" PRIu64 " : %s/%s\n",
be64toh(stream.id), stream.path_name,
stream.channel_name);
- ctx->session->streams[i].id = be64toh(stream.id);
- ctx->session->streams[i].session = ctx->session;
+ lvstream->id = be64toh(stream.id);
+ lvstream->session = ctx->session;
- ctx->session->streams[i].mmap_size = 0;
- ctx->session->streams[i].ctf_stream_id = -1ULL;
+ lvstream->mmap_size = 0;
+ lvstream->ctf_stream_id = -1ULL;
if (be32toh(stream.metadata_flag)) {
- ctx->session->streams[i].metadata_flag = 1;
+ lvstream->metadata_flag = 1;
}
- ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+ ret = lttng_live_ctf_trace_assign(lvstream,
be64toh(stream.ctf_trace_id));
if (ret < 0) {
+ g_free(lvstream);
goto error;
}
nb_streams++;
-
+ bt_list_add(&lvstream->stream_node,
+ &ctx->session->stream_list);
}
ret = nb_streams;
end:
return TRUE;
}
+static void free_session_streams(struct lttng_live_session *lsession)
+{
+ struct lttng_live_viewer_stream *lvstream, *tmp;
+
+ bt_list_for_each_entry_safe(lvstream, tmp, &lsession->stream_list,
+ stream_node) {
+ bt_list_del(&lvstream->stream_node);
+ g_free(lvstream);
+ }
+}
+
static int lttng_live_open_trace_read(const char *path)
{
int ret = 0;
ctx = g_new0(struct lttng_live_ctx, 1);
ctx->session = g_new0(struct lttng_live_session, 1);
+ BT_INIT_LIST_HEAD(&ctx->session->stream_list);
+
/* We need a pointer to the context from the packet_seek function. */
ctx->session->ctx = ctx;
end_free:
g_hash_table_destroy(ctx->session->ctf_traces);
+ free_session_streams(ctx->session);
g_free(ctx->session);
- g_free(ctx->session->streams);
g_free(ctx);
if (lttng_live_should_quit()) {