#include <stdio.h>
#include <glib.h>
#include <inttypes.h>
+#include <unistd.h>
#include "babeltrace-cfg.h"
#include "babeltrace-cfg-cli-args.h"
#include "babeltrace-cfg-cli-args-default.h"
struct bt_notification;
struct bt_clock_class_priority_map;
+struct bt_ctf_clock_class;
extern struct bt_notification *bt_notification_inactivity_create(
struct bt_clock_class_priority_map *clock_class_priority_map);
* Status code. Errors are always negative.
*/
enum bt_notification_iterator_status {
- /** Try again. */
+ /** No notifications available for now. Try again later. */
BT_NOTIFICATION_ITERATOR_STATUS_AGAIN = 11,
/** No more notifications to be delivered. */
BT_NOTIFICATION_ITERATOR_STATUS_END = 1,
break;
case BT_NOTIFICATION_TYPE_INACTIVITY:
/* Always valid */
- break;
+ goto handle_notif;
default:
/*
* Invalid type of notification. Only the notification
goto error;
}
+handle_notif:
switch (notif->type) {
case BT_NOTIFICATION_TYPE_EVENT:
ret = handle_notif_event(iterator, notif, notif_stream,
ret = handle_notif_packet_end(iterator, notif, notif_stream,
notif_packet);
break;
+ case BT_NOTIFICATION_TYPE_INACTIVITY:
+ add_action_push_notif(iterator, notif);
+ break;
default:
break;
}
goto end;
}
+ if (strlen(buf) == 0) {
+ /* An empty metadata packet is OK. */
+ goto end;
+ }
+
/* Convert the real file pointer to a memory file pointer */
fp = bt_fmemopen(buf, strlen(buf), "rb");
close_fp = true;
while (true) {
status = handle_state(notit);
+ if (status == BT_CTF_NOTIF_ITER_STATUS_AGAIN) {
+ PDBG("Medium operation reported \"try again later\"");
+ goto end;
+ }
if (status != BT_CTF_NOTIF_ITER_STATUS_OK) {
if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
PDBG("Medium operation reported end of stream\n");
struct bt_notification_iterator *it;
struct writer_component *writer_component =
bt_private_component_get_user_data(component);
-
- it = writer_component->input_iterator;
- assert(it);
+ enum bt_notification_iterator_status it_ret;
if (unlikely(writer_component->error)) {
ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
- if (likely(writer_component->processed_first_event)) {
- enum bt_notification_iterator_status it_ret;
-
- it_ret = bt_notification_iterator_next(it);
- switch (it_ret) {
- case BT_NOTIFICATION_ITERATOR_STATUS_ERROR:
- ret = BT_COMPONENT_STATUS_ERROR;
- goto end;
- case BT_NOTIFICATION_ITERATOR_STATUS_END:
- ret = BT_COMPONENT_STATUS_END;
- BT_PUT(writer_component->input_iterator);
- goto end;
- default:
- break;
- }
- }
+ it = writer_component->input_iterator;
+ assert(it);
+ it_ret = bt_notification_iterator_next(it);
- notification = bt_notification_iterator_get_notification(it);
- if (!notification) {
+ switch (it_ret) {
+ case BT_NOTIFICATION_ITERATOR_STATUS_END:
+ ret = BT_COMPONENT_STATUS_END;
+ BT_PUT(writer_component->input_iterator);
+ goto end;
+ case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+ ret = BT_COMPONENT_STATUS_AGAIN;
+ goto end;
+ case BT_NOTIFICATION_ITERATOR_STATUS_OK:
+ break;
+ default:
ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
+ notification = bt_notification_iterator_get_notification(it);
+ assert(notification);
ret = handle_notification(writer_component, notification);
- writer_component->processed_first_event = true;
end:
bt_put(notification);
return ret;
AM_CFLAGS = $(PACKAGE_CFLAGS) -I$(top_srcdir)/include -I$(top_srcdir)/plugins
-libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = \
- lttng-live.c \
- lttng-live-internal.h
+libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = lttng-live.c \
+ data-stream.c lttng-live-internal.h \
+ data-stream.h metadata.c metadata.h \
+ viewer-connection.c viewer-connection.h \
+ lttng-viewer-abi.h
noinst_LTLIBRARIES = libbabeltrace-plugin-ctf-lttng-live.la
-
--- /dev/null
+/*
+ * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
+ * Copyright 2016 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <glib.h>
+#include <inttypes.h>
+#include <sys/mman.h>
+#include <babeltrace/ctf-ir/stream.h>
+#include "../common/notif-iter/notif-iter.h"
+#include <assert.h>
+#include "data-stream.h"
+
+#define PRINT_ERR_STREAM lttng_live->error_fp
+#define PRINT_PREFIX "lttng-live-data-stream"
+#define PRINT_DBG_CHECK lttng_live_debug
+#include "../print.h"
+
+static
+enum bt_ctf_notif_iter_medium_status medop_request_bytes(
+ size_t request_sz, uint8_t **buffer_addr,
+ size_t *buffer_sz, void *data)
+{
+ enum bt_ctf_notif_iter_medium_status status =
+ BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
+ struct lttng_live_stream_iterator *stream = data;
+ struct lttng_live_trace *trace = stream->trace;
+ struct lttng_live_session *session = trace->session;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ uint64_t recv_len = 0;
+ uint64_t len_left;
+ uint64_t read_len;
+ //int i;
+
+ len_left = stream->base_offset + stream->len - stream->offset;
+ if (!len_left) {
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ return status;
+ }
+ read_len = MIN(request_sz, stream->buflen);
+ read_len = MIN(read_len, len_left);
+ status = lttng_live_get_stream_bytes(lttng_live,
+ stream, stream->buf, stream->offset,
+ read_len, &recv_len);
+#if 0 //DEBUG
+ for (i = 0; i < recv_len; i++) {
+ fprintf(stderr, "%x ", stream->buf[i]);
+ }
+ fprintf(stderr, "\n");
+#endif
+ *buffer_addr = stream->buf;
+ *buffer_sz = recv_len;
+ stream->offset += recv_len;
+ return status;
+}
+
+static
+struct bt_ctf_stream *medop_get_stream(
+ struct bt_ctf_stream_class *stream_class, void *data)
+{
+ struct lttng_live_stream_iterator *lttng_live_stream = data;
+ struct lttng_live_trace *trace = lttng_live_stream->trace;
+ struct lttng_live_session *session = trace->session;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+
+ if (!lttng_live_stream->stream) {
+ int64_t id = bt_ctf_stream_class_get_id(stream_class);
+
+ PDBG("Creating stream %s out of stream class %" PRId64 "\n",
+ lttng_live_stream->name, id);
+ lttng_live_stream->stream = bt_ctf_stream_create(stream_class,
+ lttng_live_stream->name);
+ if (!lttng_live_stream->stream) {
+ PERR("Cannot create stream %s (stream class %" PRId64 ")\n",
+ lttng_live_stream->name, id);
+ }
+ }
+
+ return lttng_live_stream->stream;
+}
+
+static struct bt_ctf_notif_iter_medium_ops medops = {
+ .request_bytes = medop_request_bytes,
+ .get_stream = medop_get_stream,
+};
+
+BT_HIDDEN
+enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init(
+ struct lttng_live_session *session)
+{
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_trace *trace;
+
+ if (!session->lazy_stream_notif_init) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ struct lttng_live_stream_iterator *stream;
+
+ bt_list_for_each_entry(stream, &trace->streams, node) {
+ if (stream->notif_iter) {
+ continue;
+ }
+ stream->notif_iter = bt_ctf_notif_iter_create(trace->trace,
+ lttng_live->max_query_size, medops,
+ stream, lttng_live->error_fp);
+ if (!stream->notif_iter) {
+ goto error;
+ }
+ }
+ }
+
+ session->lazy_stream_notif_init = false;
+
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+
+error:
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+}
+
+BT_HIDDEN
+struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
+ struct lttng_live_session *session,
+ uint64_t ctf_trace_id,
+ uint64_t stream_id)
+{
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_stream_iterator *stream =
+ g_new0(struct lttng_live_stream_iterator, 1);
+ struct lttng_live_trace *trace;
+ int ret;
+
+ trace = lttng_live_ref_trace(session, ctf_trace_id);
+ if (!trace) {
+ goto error;
+ }
+
+ stream->p.type = LIVE_STREAM_TYPE_STREAM;
+ stream->trace = trace;
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ stream->viewer_stream_id = stream_id;
+ stream->ctf_stream_class_id = -1ULL;
+ stream->last_returned_inactivity_timestamp = INT64_MIN;
+
+ if (trace->trace) {
+ stream->notif_iter = bt_ctf_notif_iter_create(trace->trace,
+ lttng_live->max_query_size, medops,
+ stream, lttng_live->error_fp);
+ if (!stream->notif_iter) {
+ goto error;
+ }
+ }
+ stream->buf = g_new0(uint8_t, session->lttng_live->max_query_size);
+ stream->buflen = session->lttng_live->max_query_size;
+
+ ret = lttng_live_add_port(lttng_live, stream);
+ assert(!ret);
+
+ bt_list_add(&stream->node, &trace->streams);
+
+ goto end;
+error:
+ /* Do not touch "borrowed" file. */
+ lttng_live_stream_iterator_destroy(stream);
+ stream = NULL;
+end:
+ return stream;
+}
+
+BT_HIDDEN
+void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream)
+{
+ struct lttng_live_component *lttng_live;
+ int ret;
+
+ if (!stream) {
+ return;
+ }
+
+ lttng_live = stream->trace->session->lttng_live;
+ ret = lttng_live_remove_port(lttng_live, stream->port);
+ assert(!ret);
+
+ if (stream->stream) {
+ BT_PUT(stream->stream);
+ }
+
+ if (stream->notif_iter) {
+ bt_ctf_notif_iter_destroy(stream->notif_iter);
+ }
+ g_free(stream->buf);
+ BT_PUT(stream->packet_end_notif_queue);
+ bt_list_del(&stream->node);
+ /*
+ * Ensure we poke the trace metadata in the future, which is
+ * required to release the metadata reference on the trace.
+ */
+ stream->trace->new_metadata_needed = true;
+ lttng_live_unref_trace(stream->trace);
+ g_free(stream);
+}
--- /dev/null
+#ifndef LTTNG_LIVE_DATA_STREAM_H
+#define LTTNG_LIVE_DATA_STREAM_H
+
+/*
+ * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <glib.h>
+#include <babeltrace/babeltrace-internal.h>
+#include <babeltrace/ctf-ir/trace.h>
+
+#include "lttng-live-internal.h"
+#include "../common/notif-iter/notif-iter.h"
+
+enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init(
+ struct lttng_live_session *session);
+
+struct lttng_live_stream_iterator *lttng_live_stream_iterator_create(
+ struct lttng_live_session *session,
+ uint64_t ctf_trace_id,
+ uint64_t stream_id);
+
+void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream);
+
+#endif /* LTTNG_LIVE_DATA_STREAM_H */
* BabelTrace - LTTng-live client Component
*
* Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* SOFTWARE.
*/
+#include <stdbool.h>
+
#include <babeltrace/babeltrace-internal.h>
#include <babeltrace/graph/component.h>
+#include <babeltrace/graph/notification-iterator.h>
+#include <babeltrace/graph/clock-class-priority-map.h>
+#include "viewer-connection.h"
+
+//TODO: this should not be used by plugins. Should copy code into plugin
+//instead.
+#include "babeltrace/object-internal.h"
+#include "babeltrace/list-internal.h"
+#include "../common/metadata/decoder.h"
+
+#define STREAM_NAME_PREFIX "stream-"
+/* Account for u64 max string length. */
+#define U64_STR_MAX_LEN 20
+#define STREAM_NAME_MAX_LEN (sizeof(STREAM_NAME_PREFIX) + U64_STR_MAX_LEN)
+
+extern bool lttng_live_debug;
+
+struct lttng_live_component;
+struct lttng_live_session;
+
+enum lttng_live_stream_state {
+ LTTNG_LIVE_STREAM_ACTIVE_NO_DATA,
+ LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA,
+ LTTNG_LIVE_STREAM_QUIESCENT,
+ LTTNG_LIVE_STREAM_ACTIVE_DATA,
+ LTTNG_LIVE_STREAM_EOF,
+};
+
+enum live_stream_type {
+ LIVE_STREAM_TYPE_NO_STREAM,
+ LIVE_STREAM_TYPE_STREAM,
+};
+
+struct lttng_live_stream_iterator_generic {
+ enum live_stream_type type;
+};
+
+/* Iterator over a live stream. */
+struct lttng_live_stream_iterator {
+ struct lttng_live_stream_iterator_generic p;
+
+ struct bt_ctf_stream *stream;
+ struct lttng_live_trace *trace;
+ struct bt_private_port *port;
+
+ /* Node of stream list within the trace. */
+ struct bt_list_head node;
+
+ /*
+ * Since only a single iterator per viewer connection, we have
+ * only a single notification iterator per stream.
+ */
+ struct bt_ctf_notif_iter *notif_iter;
+
+ uint64_t viewer_stream_id;
+
+ uint64_t ctf_stream_class_id;
+ uint64_t base_offset; /* base offset in current index. */
+ uint64_t len; /* len to read in current index. */
+ uint64_t offset; /* offset in current index. */
+
+ int64_t last_returned_inactivity_timestamp;
+ int64_t current_inactivity_timestamp;
+
+ enum lttng_live_stream_state state;
+
+ uint64_t current_packet_end_timestamp;
+ struct bt_notification *packet_end_notif_queue;
+
+ uint8_t *buf;
+ size_t buflen;
+
+ char name[STREAM_NAME_MAX_LEN];
+};
+
+struct lttng_live_no_stream_iterator {
+ struct lttng_live_stream_iterator_generic p;
+
+ struct lttng_live_component *lttng_live;
+ struct bt_private_port *port;
+};
+
+struct lttng_live_component_options {
+ bool opt_dummy : 1;
+};
+
+struct lttng_live_metadata {
+ struct lttng_live_trace *trace;
+ uint64_t stream_id;
+ uint8_t uuid[16];
+ bool is_uuid_set;
+ int bo;
+ char *text;
+
+ struct ctf_metadata_decoder *decoder;
+
+ bool closed;
+};
+
+struct lttng_live_trace {
+ struct bt_object obj;
+
+ /* Node of trace list within the session. */
+ struct bt_list_head node;
+
+ /* Back reference to session. */
+ struct lttng_live_session *session;
+
+ uint64_t id; /* ctf trace ID within the session. */
+
+ struct bt_ctf_trace *trace;
+
+ struct lttng_live_metadata *metadata;
+ struct bt_clock_class_priority_map *cc_prio_map;
+
+ /* List of struct lttng_live_stream_iterator */
+ struct bt_list_head streams;
+
+ bool new_metadata_needed;
+};
+
+struct lttng_live_session {
+ /* Node of session list within the component. */
+ struct bt_list_head node;
+
+ struct lttng_live_component *lttng_live;
+
+ uint64_t id;
+
+ /* List of struct lttng_live_trace */
+ struct bt_list_head traces;
+
+ bool attached;
+ bool new_streams_needed;
+ bool lazy_stream_notif_init;
+ bool closed;
+};
+
+/*
+ * A component instance is an iterator on a single session.
+ */
+struct lttng_live_component {
+ struct bt_object obj;
+ struct bt_private_component *private_component; /* weak */
+ struct bt_live_viewer_connection *viewer_connection;
+
+ /* List of struct lttng_live_session */
+ struct bt_list_head sessions;
+
+ GString *url;
+ FILE *error_fp;
+ size_t max_query_size;
+ struct lttng_live_component_options options;
+
+ struct bt_private_port *no_stream_port;
+ struct lttng_live_no_stream_iterator *no_stream_iter;
+};
+
+enum bt_ctf_lttng_live_iterator_status {
+ /** Iterator state has progressed. Continue iteration immediately. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE = 3,
+ /** No notification available for now. Try again later. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN = 2,
+ /** No more CTF_LTTNG_LIVEs to be delivered. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END = 1,
+ /** No error, okay. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK = 0,
+ /** Invalid arguments. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL = -1,
+ /** General error. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR = -2,
+ /** Out of memory. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM = -3,
+ /** Unsupported iterator feature. */
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED = -4,
+};
BT_HIDDEN
-enum bt_component_status lttng_live_init(struct bt_private_component *source,
+enum bt_component_status lttng_live_component_init(struct bt_private_component *source,
struct bt_value *params, void *init_method_data);
+struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
+ const char *object, struct bt_value *params);
+
+void lttng_live_component_finalize(struct bt_private_component *component);
+
BT_HIDDEN
struct bt_notification_iterator_next_return lttng_live_iterator_next(
struct bt_private_notification_iterator *iterator);
+
+enum bt_notification_iterator_status lttng_live_iterator_init(
+ struct bt_private_notification_iterator *it,
+ struct bt_private_port *port);
+
+void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it);
+
+int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live);
+int lttng_live_attach_session(struct lttng_live_session *session);
+int lttng_live_detach_session(struct lttng_live_session *session);
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
+ struct lttng_live_session *session);
+
+int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id);
+
+ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
+ FILE *fp);
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream,
+ struct packet_index *index);
+enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
+ uint64_t req_len, uint64_t *recv_len);
+
+int lttng_live_add_port(struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream_iter);
+int lttng_live_remove_port(struct lttng_live_component *lttng_live,
+ struct bt_private_port *port);
+
+struct lttng_live_trace *lttng_live_ref_trace(
+ struct lttng_live_session *session, uint64_t trace_id);
+void lttng_live_unref_trace(struct lttng_live_trace *trace);
+void lttng_live_need_new_streams(struct lttng_live_component *lttng_live);
+
#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H */
* Babeltrace CTF LTTng-live Client Component
*
* Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* SOFTWARE.
*/
-#include "lttng-live-internal.h"
+#include <babeltrace/ctf-ir/packet.h>
#include <babeltrace/graph/component-source.h>
+#include <babeltrace/graph/private-port.h>
+#include <babeltrace/graph/port.h>
+#include <babeltrace/graph/private-component.h>
+#include <babeltrace/graph/private-component-source.h>
+#include <babeltrace/graph/private-notification-iterator.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-heap.h>
+#include <babeltrace/graph/notification-iterator.h>
+#include <babeltrace/graph/notification-inactivity.h>
+#include <babeltrace/compiler-internal.h>
+#include <inttypes.h>
+#include <glib.h>
+#include <assert.h>
+#include <unistd.h>
#include <plugins-common.h>
+#include "lttng-live-internal.h"
+#include "data-stream.h"
+#include "metadata.h"
+
+#define PRINT_ERR_STREAM (lttng_live->error_fp)
+#define PRINT_PREFIX "lttng-live"
+#define PRINT_DBG_CHECK lttng_live_debug
+#define MAX_QUERY_SIZE (256*1024)
+#include "../print.h"
+
+#ifdef LIVE_DEBUG
+#define print_dbg(fmt, args...) \
+ fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \
+ __func__, __LINE__, ## args)
+
+static const char *print_state(struct lttng_live_stream_iterator *s)
+{
+ switch (s->state) {
+ case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+ return "ACTIVE_NO_DATA";
+ case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+ return "QUIESCENT_NO_DATA";
+ case LTTNG_LIVE_STREAM_QUIESCENT:
+ return "QUIESCENT";
+ case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+ return "ACTIVE_DATA";
+ case LTTNG_LIVE_STREAM_EOF:
+ return "EOF";
+ default:
+ return "ERROR";
+ }
+}
+#else
+#define print_dbg(fmt, args...)
+#endif
+
+#define print_stream_state(stream) \
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
+ bt_port_get_name(bt_port_from_private_port(stream->port)), \
+ print_state(stream), stream->last_returned_inactivity_timestamp, \
+ stream->current_inactivity_timestamp)
+
BT_HIDDEN
-struct bt_notification *lttng_live_iterator_get(
- struct bt_private_notification_iterator *iterator)
+bool lttng_live_debug;
+
+BT_HIDDEN
+int lttng_live_add_port(struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream_iter)
+{
+ int ret;
+ struct bt_private_port *private_port;
+ char name[STREAM_NAME_MAX_LEN];
+
+ ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
+ assert(ret > 0);
+ strcpy(stream_iter->name, name);
+ private_port = bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, name, stream_iter);
+ if (!private_port) {
+ return -1;
+ }
+ PDBG("Added port %s\n", name);
+
+ if (lttng_live->no_stream_port) {
+ ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ if (ret) {
+ return -1;
+ }
+ BT_PUT(lttng_live->no_stream_port);
+ lttng_live->no_stream_iter->port = NULL;
+ }
+ stream_iter->port = private_port;
+ return 0;
+}
+
+BT_HIDDEN
+int lttng_live_remove_port(struct lttng_live_component *lttng_live,
+ struct bt_private_port *port)
+{
+ struct bt_component *component;
+ int64_t nr_ports;
+ int ret;
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ nr_ports = bt_component_source_get_output_port_count(component);
+ if (nr_ports < 0) {
+ return -1;
+ }
+ BT_PUT(component);
+ if (nr_ports == 1) {
+ assert(!lttng_live->no_stream_port);
+ lttng_live->no_stream_port =
+ bt_private_component_source_add_output_private_port(lttng_live->private_component,
+ "no-stream", lttng_live->no_stream_iter);
+ if (!lttng_live->no_stream_port) {
+ return -1;
+ }
+ lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+ }
+ ret = bt_private_port_remove_from_component(port);
+ if (ret) {
+ return -1;
+ }
+ return 0;
+}
+
+static
+struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
{
+ struct lttng_live_trace *trace;
+
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ if (trace->id == trace_id) {
+ return trace;
+ }
+ }
return NULL;
}
+static
+void lttng_live_destroy_trace(struct bt_object *obj)
+{
+ struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+
+ PDBG("Destroy trace\n");
+ assert(bt_list_empty(&trace->streams));
+ bt_list_del(&trace->node);
+ lttng_live_metadata_fini(trace);
+ BT_PUT(trace->cc_prio_map);
+ g_free(trace);
+}
+
+static
+struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
+{
+ struct lttng_live_trace *trace = NULL;
+
+ trace = g_new0(struct lttng_live_trace, 1);
+ if (!trace) {
+ goto error;
+ }
+ trace->session = session;
+ trace->id = trace_id;
+ BT_INIT_LIST_HEAD(&trace->streams);
+ trace->new_metadata_needed = true;
+ bt_list_add(&trace->node, &session->traces);
+ bt_object_init(&trace->obj, lttng_live_destroy_trace);
+ goto end;
+error:
+ g_free(trace);
+ trace = NULL;
+end:
+ return trace;
+}
+
+BT_HIDDEN
+struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
+ uint64_t trace_id)
+{
+ struct lttng_live_trace *trace;
+
+ trace = lttng_live_find_trace(session, trace_id);
+ if (trace) {
+ bt_get(trace);
+ return trace;
+ }
+ return lttng_live_create_trace(session, trace_id);
+}
+
+BT_HIDDEN
+void lttng_live_unref_trace(struct lttng_live_trace *trace)
+{
+ bt_put(trace);
+}
+
+static
+void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
+{
+ struct lttng_live_stream_iterator *stream, *s;
+
+ bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
+ lttng_live_stream_iterator_destroy(stream);
+ }
+ lttng_live_metadata_fini(trace);
+}
+
+BT_HIDDEN
+int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+{
+ int ret = 0;
+ struct lttng_live_session *s;
+
+ s = g_new0(struct lttng_live_session, 1);
+ if (!s) {
+ goto error;
+ }
+
+ s->id = session_id;
+ BT_INIT_LIST_HEAD(&s->traces);
+ s->lttng_live = lttng_live;
+ s->new_streams_needed = true;
+
+ PDBG("Reading from session %" PRIu64 "\n", s->id);
+ bt_list_add(&s->node, <tng_live->sessions);
+ goto end;
+error:
+ PERR("Error adding session\n");
+ g_free(s);
+ ret = -1;
+end:
+ return ret;
+}
+
+static
+void lttng_live_destroy_session(struct lttng_live_session *session)
+{
+ struct lttng_live_trace *trace, *t;
+
+ PDBG("Destroy session\n");
+ if (session->id != -1ULL) {
+ if (lttng_live_detach_session(session)) {
+ /* Old relayd cannot detach sessions. */
+ PDBG("Unable to detach session %" PRIu64 "\n",
+ session->id);
+ }
+ session->id = -1ULL;
+ }
+ bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+ lttng_live_close_trace_streams(trace);
+ }
+ bt_list_del(&session->node);
+ g_free(session);
+}
+
+BT_HIDDEN
+void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
+{
+ struct lttng_live_stream_iterator_generic *s =
+ bt_private_notification_iterator_get_user_data(it);
+
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ {
+ /* Leave no_stream_iter in place when port is removed. */
+ break;
+ }
+ case LIVE_STREAM_TYPE_STREAM:
+ {
+ struct lttng_live_stream_iterator *stream_iter =
+ container_of(s, struct lttng_live_stream_iterator, p);
+
+ lttng_live_stream_iterator_destroy(stream_iter);
+ break;
+ }
+ }
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream)
+{
+ switch (lttng_live_stream->state) {
+ case LTTNG_LIVE_STREAM_QUIESCENT:
+ case LTTNG_LIVE_STREAM_ACTIVE_DATA:
+ break;
+ case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
+ /* Invalid state. */
+ PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
+ /* Invalid state. */
+ PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ case LTTNG_LIVE_STREAM_EOF:
+ break;
+ }
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+}
+
+/*
+ * For active no data stream, fetch next data. It can be either:
+ * - quiescent: need to put it in the prio heap at quiescent end
+ * timestamp,
+ * - have data: need to wire up first event into the prio heap,
+ * - have no data on this stream at this point: need to retry (AGAIN) or
+ * return EOF.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct packet_index index;
+ enum lttng_live_stream_state orig_state = lttng_live_stream->state;
+
+ if (lttng_live_stream->trace->new_metadata_needed) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+ if (lttng_live_stream->trace->session->new_streams_needed) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
+ && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
+ goto end;
+ }
+ ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
+ if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+ if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
+ if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
+ && lttng_live_stream->last_returned_inactivity_timestamp ==
+ lttng_live_stream->current_inactivity_timestamp) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ print_stream_state(lttng_live_stream);
+ } else {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ goto end;
+ }
+ lttng_live_stream->base_offset = index.offset;
+ lttng_live_stream->offset = index.offset;
+ lttng_live_stream->len = index.packet_size / CHAR_BIT;
+end:
+ if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ ret = lttng_live_iterator_next_check_stream_state(
+ lttng_live, lttng_live_stream);
+ }
+ return ret;
+}
+
+/*
+ * Creation of the notification requires the ctf trace to be created
+ * beforehand, but the live protocol gives us all streams (including
+ * metadata) at once. So we split it in three steps: getting streams,
+ * getting metadata (which creates the ctf trace), and then creating the
+ * per-stream notifications.
+ */
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_session *session)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct lttng_live_trace *trace, *t;
+
+ if (lttng_live_attach_session(session)) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ status = lttng_live_get_new_streams(session);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ return status;
+ }
+ bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
+ status = lttng_live_metadata_update(trace);
+ if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ int retval;
+
+ retval = bt_ctf_trace_set_is_static(trace->trace);
+ assert(!retval);
+ } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ return status;
+ }
+ }
+ return lttng_live_lazy_notif_init(session);
+}
+
+BT_HIDDEN
+void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
+{
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ session->new_streams_needed = true;
+ }
+}
+
+static
+void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
+{
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ session->new_streams_needed = true;
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ trace->new_metadata_needed = true;
+ }
+ }
+}
+
+static
+enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+ struct lttng_live_component *lttng_live)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ unsigned int nr_sessions_opened = 0;
+ struct lttng_live_session *session, *s;
+
+ bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
+ if (session->closed && bt_list_empty(&session->traces)) {
+ lttng_live_destroy_session(session);
+ }
+ }
+ /*
+ * Currently, when there are no sessions, we quit immediately.
+ * We may want to add a component parameter to keep trying until
+ * we get data in the future.
+ * Also, in a remotely distant future, we could add a "new
+ * session" flag to the protocol, which would tell us that we
+ * need to query for new sessions even though we have sessions
+ * currently ongoing.
+ */
+ if (bt_list_empty(<tng_live->sessions)) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
+ }
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ ret = lttng_live_get_session(lttng_live, session);
+ switch (ret) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ default:
+ goto end;
+ }
+ if (!session->closed) {
+ nr_sessions_opened++;
+ }
+ }
+end:
+ if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+ return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification,
+ uint64_t timestamp)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct lttng_live_trace *trace;
+ struct bt_ctf_clock_class *clock_class = NULL;
+ struct bt_ctf_clock_value *clock_value = NULL;
+ struct bt_notification *notif = NULL;
+ int retval;
+
+ trace = lttng_live_stream->trace;
+ if (!trace) {
+ goto error;
+ }
+ clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
+ if (!clock_class) {
+ goto error;
+ }
+ clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
+ if (!clock_value) {
+ goto error;
+ }
+ notif = bt_notification_inactivity_create(trace->cc_prio_map);
+ if (!notif) {
+ goto error;
+ }
+ retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+ if (retval) {
+ goto error;
+ }
+ *notification = notif;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+
+error:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ bt_put(notif);
+ goto end;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct bt_ctf_clock_class *clock_class = NULL;
+ struct bt_ctf_clock_value *clock_value = NULL;
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+
+ if (lttng_live_stream->current_inactivity_timestamp ==
+ lttng_live_stream->last_returned_inactivity_timestamp) {
+ lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ goto end;
+ }
+
+ ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+ (uint64_t) lttng_live_stream->current_inactivity_timestamp);
+
+ lttng_live_stream->last_returned_inactivity_timestamp =
+ lttng_live_stream->current_inactivity_timestamp;
+end:
+ bt_put(clock_value);
+ bt_put(clock_class);
+ return ret;
+}
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+ struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *lttng_live_stream,
+ struct bt_notification **notification)
+{
+ enum bt_ctf_lttng_live_iterator_status ret =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_ctf_notif_iter_status status;
+ struct lttng_live_session *session;
+
+ bt_list_for_each_entry(session, <tng_live->sessions, node) {
+ struct lttng_live_trace *trace;
+
+ if (session->new_streams_needed) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ bt_list_for_each_entry(trace, &session->traces, node) {
+ if (trace->new_metadata_needed) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ }
+ }
+
+ if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
+ if (lttng_live_stream->packet_end_notif_queue) {
+ *notification = lttng_live_stream->packet_end_notif_queue;
+ lttng_live_stream->packet_end_notif_queue = NULL;
+ status = BT_CTF_NOTIF_ITER_STATUS_OK;
+ } else {
+ status = bt_ctf_notif_iter_get_next_notification(
+ lttng_live_stream->notif_iter,
+ lttng_live_stream->trace->cc_prio_map,
+ notification);
+ if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
+ /*
+ * Consider empty packets as inactivity.
+ */
+ if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
+ lttng_live_stream->packet_end_notif_queue = *notification;
+ *notification = NULL;
+ return emit_inactivity_notification(lttng_live,
+ lttng_live_stream, notification,
+ lttng_live_stream->current_packet_end_timestamp);
+ }
+ }
+ }
+ switch (status) {
+ case BT_CTF_NOTIF_ITER_STATUS_EOF:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_OK:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+ /*
+ * Continue immediately (end of packet). The next
+ * get_index may return AGAIN to delay the following
+ * attempt.
+ */
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ break;
+ case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+ /* No argument provided by the user, so don't return INVAL. */
+ case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+ default:
+ ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return ret;
+}
+
+/*
+ * helper function:
+ * handle_no_data_streams()
+ * retry:
+ * - for each ACTIVE_NO_DATA stream:
+ * - query relayd for stream data, or quiescence info.
+ * - if need metadata, get metadata, goto retry.
+ * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
+ * - if quiescent, move to QUIESCENT streams
+ * - if fetched data, move to ACTIVE_DATA streams
+ * (at this point each stream either has data, or is quiescent)
+ *
+ *
+ * iterator_next:
+ * handle_new_streams_and_metadata()
+ * - query relayd for known streams, add them as ACTIVE_NO_DATA
+ * - query relayd for metadata
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_quiescent_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - peek stream event with lowest timestamp -> next_ts
+ * - for each quiescent stream
+ * - if next_ts >= quiescent end
+ * - set state to ACTIVE_NO_DATA
+ * - else
+ * - for each quiescent stream
+ * - set state to ACTIVE_NO_DATA
+ *
+ * call handle_active_no_data_streams()
+ *
+ * handle_active_data_streams()
+ * - if at least one stream is ACTIVE_DATA:
+ * - get stream event with lowest timestamp from heap
+ * - make that stream event the current notification.
+ * - move this stream heap position to its next event
+ * - if we need to fetch data from relayd, move
+ * stream to ACTIVE_NO_DATA.
+ * - return OK
+ * - return AGAIN
+ *
+ * end criterion: ctrl-c on client. If relayd exits or the session
+ * closes on the relay daemon side, we keep on waiting for streams.
+ * Eventually handle --end timestamp (also an end criterion).
+ *
+ * When disconnected from relayd: try to re-connect endlessly.
+ */
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
+ struct bt_private_notification_iterator *iterator,
+ struct lttng_live_stream_iterator *stream_iter)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = stream_iter->trace->session->lttng_live;
+retry:
+ print_stream_state(stream_iter);
+ next_return.notification = NULL;
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_no_data_stream(
+ lttng_live, stream_iter);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_quiescent_stream(
+ lttng_live, stream_iter, &next_return.notification);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ assert(next_return.notification == NULL);
+ goto end;
+ }
+ if (next_return.notification) {
+ goto end;
+ }
+ status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
+ stream_iter, &next_return.notification);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ assert(next_return.notification == NULL);
+ }
+
+end:
+ switch (status) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ print_dbg("continue");
+ goto retry;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ print_dbg("again");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ print_dbg("end");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ print_dbg("ok");
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
+static
+struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
+ struct bt_private_notification_iterator *iterator,
+ struct lttng_live_no_stream_iterator *no_stream_iter)
+{
+ enum bt_ctf_lttng_live_iterator_status status;
+ struct bt_notification_iterator_next_return next_return;
+ struct lttng_live_component *lttng_live;
+
+ lttng_live = no_stream_iter->lttng_live;
+retry:
+ lttng_live_force_new_streams_and_metadata(lttng_live);
+ status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ if (no_stream_iter->port) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ }
+end:
+ switch (status) {
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ goto retry;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ break;
+ case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ default: /* fall-through */
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+
BT_HIDDEN
struct bt_notification_iterator_next_return lttng_live_iterator_next(
struct bt_private_notification_iterator *iterator)
{
- struct bt_notification_iterator_next_return ret = {
- .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR,
- };
+ struct lttng_live_stream_iterator_generic *s =
+ bt_private_notification_iterator_get_user_data(iterator);
+ struct bt_notification_iterator_next_return next_return;
+
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ next_return = lttng_live_iterator_next_no_stream(iterator,
+ container_of(s, struct lttng_live_no_stream_iterator, p));
+ break;
+ case LIVE_STREAM_TYPE_STREAM:
+ next_return = lttng_live_iterator_next_stream(iterator,
+ container_of(s, struct lttng_live_stream_iterator, p));
+ break;
+ default:
+ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ break;
+ }
+ return next_return;
+}
+BT_HIDDEN
+enum bt_notification_iterator_status lttng_live_iterator_init(
+ struct bt_private_notification_iterator *it,
+ struct bt_private_port *port)
+{
+ enum bt_notification_iterator_status ret =
+ BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ struct lttng_live_stream_iterator_generic *s;
+ struct lttng_live_component *lttng_live;
+
+ assert(it);
+
+ s = bt_private_port_get_user_data(port);
+ assert(s);
+ switch (s->type) {
+ case LIVE_STREAM_TYPE_NO_STREAM:
+ {
+ struct lttng_live_no_stream_iterator *no_stream_iter =
+ container_of(s, struct lttng_live_no_stream_iterator, p);
+ lttng_live = no_stream_iter->lttng_live;
+ ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ case LIVE_STREAM_TYPE_STREAM:
+ {
+ struct lttng_live_stream_iterator *stream_iter =
+ container_of(s, struct lttng_live_stream_iterator, p);
+ lttng_live = stream_iter->trace->session->lttng_live;
+ ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
+ if (ret) {
+ goto error;
+ }
+ break;
+ }
+ default:
+ ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+end:
return ret;
+error:
+ if (bt_private_notification_iterator_set_user_data(it, NULL)
+ != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ PERR("Error setting private data to NULL\n");
+ }
+ goto end;
+}
+
+static
+struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
+ struct bt_value *params)
+{
+ struct bt_value *url_value = NULL;
+ struct bt_value *results = NULL;
+ const char *url;
+ struct bt_live_viewer_connection *viewer_connection = NULL;
+ enum bt_value_status ret;
+
+ url_value = bt_value_map_get(params, "url");
+ if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
+ fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ goto error;
+ }
+
+ ret = bt_value_string_get(url_value, &url);
+ if (ret != BT_VALUE_STATUS_OK) {
+ fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ goto error;
+ }
+
+ viewer_connection = bt_live_viewer_connection_create(url, stderr);
+ if (!viewer_connection) {
+ ret = BT_COMPONENT_STATUS_NOMEM;
+ goto error;
+ }
+
+ results = bt_live_viewer_connection_list_sessions(viewer_connection);
+ goto end;
+error:
+ BT_PUT(results);
+end:
+ if (viewer_connection) {
+ bt_live_viewer_connection_destroy(viewer_connection);
+ }
+ BT_PUT(url_value);
+ return results;
+}
+
+BT_HIDDEN
+struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
+ const char *object, struct bt_value *params)
+{
+ if (strcmp(object, "sessions") == 0) {
+ return lttng_live_query_list_sessions(comp_class,
+ params);
+ }
+ fprintf(stderr, "Unknown query object `%s`\n", object);
+ return NULL;
+}
+
+static
+void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
+{
+ int ret;
+ struct lttng_live_session *session, *s;
+
+ bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
+ lttng_live_destroy_session(session);
+ }
+ BT_PUT(lttng_live->viewer_connection);
+ if (lttng_live->url) {
+ g_string_free(lttng_live->url, TRUE);
+ }
+ if (lttng_live->no_stream_port) {
+ ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ assert(!ret);
+ BT_PUT(lttng_live->no_stream_port);
+ }
+ if (lttng_live->no_stream_iter) {
+ g_free(lttng_live->no_stream_iter);
+ }
+ g_free(lttng_live);
+}
+
+BT_HIDDEN
+void lttng_live_component_finalize(struct bt_private_component *component)
+{
+ void *data = bt_private_component_get_user_data(component);
+
+ if (!data) {
+ return;
+ }
+ lttng_live_component_destroy_data(data);
+}
+
+static
+struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
+ struct bt_private_component *private_component)
+{
+ struct lttng_live_component *lttng_live;
+ struct bt_value *value = NULL;
+ const char *url;
+ enum bt_value_status ret;
+
+ lttng_live = g_new0(struct lttng_live_component, 1);
+ if (!lttng_live) {
+ goto end;
+ }
+ lttng_live->error_fp = stderr;
+ /* TODO: make this an overridable parameter. */
+ lttng_live->max_query_size = MAX_QUERY_SIZE;
+ BT_INIT_LIST_HEAD(<tng_live->sessions);
+ value = bt_value_map_get(params, "url");
+ if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
+ fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ goto error;
+ }
+ ret = bt_value_string_get(value, &url);
+ if (ret != BT_VALUE_STATUS_OK) {
+ fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ goto error;
+ }
+ lttng_live->url = g_string_new(url);
+ if (!lttng_live->url) {
+ goto error;
+ }
+ lttng_live->viewer_connection =
+ bt_live_viewer_connection_create(lttng_live->url->str,
+ stderr);
+ if (!lttng_live->viewer_connection) {
+ ret = BT_COMPONENT_STATUS_NOMEM;
+ goto error;
+ }
+ if (lttng_live_create_viewer_session(lttng_live)) {
+ ret = BT_COMPONENT_STATUS_ERROR;
+ goto error;
+ }
+ lttng_live->private_component = private_component;
+
+ goto end;
+
+error:
+ lttng_live_component_destroy_data(lttng_live);
+ lttng_live = NULL;
+end:
+ return lttng_live;
}
BT_HIDDEN
-enum bt_component_status lttng_live_init(struct bt_private_component *component,
- struct bt_value *params, UNUSED_VAR void *init_method_data)
+enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
+ struct bt_value *params, void *init_method_data)
{
- return BT_COMPONENT_STATUS_OK;
+ struct lttng_live_component *lttng_live;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
+
+ /* Passes ownership of iter ref to lttng_live_component_create. */
+ lttng_live = lttng_live_component_create(params, component);
+ if (!lttng_live) {
+ ret = BT_COMPONENT_STATUS_NOMEM;
+ goto end;
+ }
+
+ lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
+ lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
+ lttng_live->no_stream_iter->lttng_live = lttng_live;
+
+ lttng_live->no_stream_port =
+ bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, "no-stream",
+ lttng_live->no_stream_iter);
+ lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
+
+ ret = bt_private_component_set_user_data(component, lttng_live);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto error;
+ }
+
+end:
+ return ret;
+error:
+ (void) bt_private_component_set_user_data(component, NULL);
+ lttng_live_component_destroy_data(lttng_live);
+ return ret;
}
--- /dev/null
+#ifndef LTTNG_VIEWER_ABI_H
+#define LTTNG_VIEWER_ABI_H
+
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
+ * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * David Goulet <dgoulet@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <babeltrace/compat/limits-internal.h>
+
+#define LTTNG_VIEWER_PATH_MAX 4096
+#define LTTNG_VIEWER_NAME_MAX 255
+#define LTTNG_VIEWER_HOST_NAME_MAX 64
+
+/* Flags in reply to get_next_index and get_packet. */
+enum {
+ /* New metadata is required to read this packet. */
+ LTTNG_VIEWER_FLAG_NEW_METADATA = (1 << 0),
+ /* New stream got added to the trace. */
+ LTTNG_VIEWER_FLAG_NEW_STREAM = (1 << 1),
+};
+
+enum lttng_viewer_command {
+ LTTNG_VIEWER_CONNECT = 1,
+ LTTNG_VIEWER_LIST_SESSIONS = 2,
+ LTTNG_VIEWER_ATTACH_SESSION = 3,
+ LTTNG_VIEWER_GET_NEXT_INDEX = 4,
+ LTTNG_VIEWER_GET_PACKET = 5,
+ LTTNG_VIEWER_GET_METADATA = 6,
+ LTTNG_VIEWER_GET_NEW_STREAMS = 7,
+ LTTNG_VIEWER_CREATE_SESSION = 8,
+ LTTNG_VIEWER_DETACH_SESSION = 9,
+};
+
+enum lttng_viewer_attach_return_code {
+ LTTNG_VIEWER_ATTACH_OK = 1, /* The attach command succeeded. */
+ LTTNG_VIEWER_ATTACH_ALREADY = 2, /* A viewer is already attached. */
+ LTTNG_VIEWER_ATTACH_UNK = 3, /* The session ID is unknown. */
+ LTTNG_VIEWER_ATTACH_NOT_LIVE = 4, /* The session is not live. */
+ LTTNG_VIEWER_ATTACH_SEEK_ERR = 5, /* Seek error. */
+ LTTNG_VIEWER_ATTACH_NO_SESSION = 6, /* No viewer session created. */
+};
+
+enum lttng_viewer_detach_session_return_code {
+ LTTNG_VIEWER_DETACH_SESSION_OK = 1,
+ LTTNG_VIEWER_DETACH_SESSION_UNK = 2,
+ LTTNG_VIEWER_DETACH_SESSION_ERR = 3,
+};
+
+enum lttng_viewer_next_index_return_code {
+ LTTNG_VIEWER_INDEX_OK = 1, /* Index is available. */
+ LTTNG_VIEWER_INDEX_RETRY = 2, /* Index not yet available. */
+ LTTNG_VIEWER_INDEX_HUP = 3, /* Index closed (trace destroyed). */
+ LTTNG_VIEWER_INDEX_ERR = 4, /* Unknow error. */
+ LTTNG_VIEWER_INDEX_INACTIVE = 5, /* Inactive stream beacon. */
+ LTTNG_VIEWER_INDEX_EOF = 6, /* End of index file. */
+};
+
+enum lttng_viewer_get_packet_return_code {
+ LTTNG_VIEWER_GET_PACKET_OK = 1,
+ LTTNG_VIEWER_GET_PACKET_RETRY = 2,
+ LTTNG_VIEWER_GET_PACKET_ERR = 3,
+ LTTNG_VIEWER_GET_PACKET_EOF = 4,
+};
+
+enum lttng_viewer_get_metadata_return_code {
+ LTTNG_VIEWER_METADATA_OK = 1,
+ LTTNG_VIEWER_NO_NEW_METADATA = 2,
+ LTTNG_VIEWER_METADATA_ERR = 3,
+};
+
+enum lttng_viewer_connection_type {
+ LTTNG_VIEWER_CLIENT_COMMAND = 1,
+ LTTNG_VIEWER_CLIENT_NOTIFICATION = 2,
+};
+
+enum lttng_viewer_seek {
+ /* Receive the trace packets from the beginning. */
+ LTTNG_VIEWER_SEEK_BEGINNING = 1,
+ /* Receive the trace packets from now. */
+ LTTNG_VIEWER_SEEK_LAST = 2,
+};
+
+enum lttng_viewer_new_streams_return_code {
+ LTTNG_VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */
+ LTTNG_VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */
+ LTTNG_VIEWER_NEW_STREAMS_ERR = 3, /* Error. */
+ LTTNG_VIEWER_NEW_STREAMS_HUP = 4, /* Session closed. */
+};
+
+enum lttng_viewer_create_session_return_code {
+ LTTNG_VIEWER_CREATE_SESSION_OK = 1,
+ LTTNG_VIEWER_CREATE_SESSION_ERR = 2,
+};
+
+struct lttng_viewer_session {
+ uint64_t id;
+ uint32_t live_timer;
+ uint32_t clients;
+ uint32_t streams;
+ char hostname[LTTNG_VIEWER_HOST_NAME_MAX];
+ char session_name[LTTNG_VIEWER_NAME_MAX];
+} __attribute__((__packed__));
+
+struct lttng_viewer_stream {
+ uint64_t id;
+ uint64_t ctf_trace_id;
+ uint32_t metadata_flag;
+ char path_name[LTTNG_VIEWER_PATH_MAX];
+ char channel_name[LTTNG_VIEWER_NAME_MAX];
+} __attribute__((__packed__));
+
+struct lttng_viewer_cmd {
+ uint64_t data_size; /* data size following this header */
+ uint32_t cmd; /* enum lttcomm_relayd_command */
+ uint32_t cmd_version; /* command version */
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_CONNECT payload.
+ */
+struct lttng_viewer_connect {
+ /* session ID assigned by the relay for command connections */
+ uint64_t viewer_session_id;
+ uint32_t major;
+ uint32_t minor;
+ uint32_t type; /* enum lttng_viewer_connection_type */
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_LIST_SESSIONS payload.
+ */
+struct lttng_viewer_list_sessions {
+ uint32_t sessions_count;
+ char session_list[]; /* struct lttng_viewer_session */
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_ATTACH_SESSION payload.
+ */
+struct lttng_viewer_attach_session_request {
+ uint64_t session_id;
+ uint64_t offset; /* unused for now */
+ uint32_t seek; /* enum lttng_viewer_seek */
+} __attribute__((__packed__));
+
+struct lttng_viewer_attach_session_response {
+ /* enum lttng_viewer_attach_return_code */
+ uint32_t status;
+ uint32_t streams_count;
+ /* struct lttng_viewer_stream */
+ char stream_list[];
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_GET_NEXT_INDEX payload.
+ */
+struct lttng_viewer_get_next_index {
+ uint64_t stream_id;
+} __attribute__ ((__packed__));
+
+struct lttng_viewer_index {
+ uint64_t offset;
+ uint64_t packet_size;
+ uint64_t content_size;
+ uint64_t timestamp_begin;
+ uint64_t timestamp_end;
+ uint64_t events_discarded;
+ uint64_t stream_id;
+ uint32_t status; /* enum lttng_viewer_next_index_return_code */
+ uint32_t flags; /* LTTNG_VIEWER_FLAG_* */
+} __attribute__ ((__packed__));
+
+/*
+ * LTTNG_VIEWER_GET_PACKET payload.
+ */
+struct lttng_viewer_get_packet {
+ uint64_t stream_id;
+ uint64_t offset;
+ uint32_t len;
+} __attribute__((__packed__));
+
+struct lttng_viewer_trace_packet {
+ uint32_t status; /* enum lttng_viewer_get_packet_return_code */
+ uint32_t len;
+ uint32_t flags; /* LTTNG_VIEWER_FLAG_* */
+ char data[];
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_GET_METADATA payload.
+ */
+struct lttng_viewer_get_metadata {
+ uint64_t stream_id;
+} __attribute__((__packed__));
+
+struct lttng_viewer_metadata_packet {
+ uint64_t len;
+ uint32_t status; /* enum lttng_viewer_get_metadata_return_code */
+ char data[];
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_GET_NEW_STREAMS payload.
+ */
+struct lttng_viewer_new_streams_request {
+ uint64_t session_id;
+} __attribute__((__packed__));
+
+struct lttng_viewer_new_streams_response {
+ /* enum lttng_viewer_new_streams_return_code */
+ uint32_t status;
+ uint32_t streams_count;
+ /* struct lttng_viewer_stream */
+ char stream_list[];
+} __attribute__((__packed__));
+
+struct lttng_viewer_create_session_response {
+ /* enum lttng_viewer_create_session_return_code */
+ uint32_t status;
+} __attribute__((__packed__));
+
+/*
+ * LTTNG_VIEWER_DETACH_SESSION payload.
+ */
+struct lttng_viewer_detach_session_request {
+ uint64_t session_id;
+} __attribute__((__packed__));
+
+struct lttng_viewer_detach_session_response {
+ /* enum lttng_viewer_detach_session_return_code */
+ uint32_t status;
+} __attribute__((__packed__));
+
+#endif /* LTTNG_VIEWER_ABI_H */
--- /dev/null
+/*
+ * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
+ * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation
+ *
+ * Some functions are based on older functions written by Mathieu Desnoyers.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <glib.h>
+#include <babeltrace/compat/uuid-internal.h>
+#include <babeltrace/compat/memstream-internal.h>
+
+#define PRINT_ERR_STREAM lttng_live->error_fp
+#define PRINT_PREFIX "lttng-live-metadata"
+#define PRINT_DBG_CHECK lttng_live_debug
+#include "../print.h"
+
+#include "metadata.h"
+#include "../common/metadata/decoder.h"
+
+#define TSDL_MAGIC 0x75d11d57
+
+struct packet_header {
+ uint32_t magic;
+ uint8_t uuid[16];
+ uint32_t checksum;
+ uint32_t content_size;
+ uint32_t packet_size;
+ uint8_t compression_scheme;
+ uint8_t encryption_scheme;
+ uint8_t checksum_scheme;
+ uint8_t major;
+ uint8_t minor;
+} __attribute__((__packed__));
+
+static
+enum bt_ctf_lttng_live_iterator_status lttng_live_update_clock_map(
+ struct lttng_live_trace *trace)
+{
+ enum bt_ctf_lttng_live_iterator_status status =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ size_t i;
+ int count, ret;
+
+ BT_PUT(trace->cc_prio_map);
+ trace->cc_prio_map = bt_clock_class_priority_map_create();
+ if (!trace->cc_prio_map) {
+ goto error;
+ }
+
+ count = bt_ctf_trace_get_clock_class_count(trace->trace);
+ assert(count >= 0);
+
+ for (i = 0; i < count; i++) {
+ struct bt_ctf_clock_class *clock_class =
+ bt_ctf_trace_get_clock_class_by_index(trace->trace, i);
+
+ assert(clock_class);
+ ret = bt_clock_class_priority_map_add_clock_class(
+ trace->cc_prio_map, clock_class, 0);
+ BT_PUT(clock_class);
+
+ if (ret) {
+ goto error;
+ }
+ }
+
+ goto end;
+error:
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+end:
+ return status;
+}
+
+BT_HIDDEN
+enum bt_ctf_lttng_live_iterator_status lttng_live_metadata_update(
+ struct lttng_live_trace *trace)
+{
+ struct lttng_live_session *session = trace->session;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_metadata *metadata = trace->metadata;
+ ssize_t ret = 0;
+ size_t size, len_read = 0;
+ char *metadata_buf = NULL;
+ FILE *fp = NULL;
+ enum ctf_metadata_decoder_status decoder_status;
+ enum bt_ctf_lttng_live_iterator_status status =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+
+ /* No metadata stream yet. */
+ if (!metadata) {
+ if (session->new_streams_needed) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ session->new_streams_needed = true;
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ }
+ goto end;
+ }
+
+ if (!trace->new_metadata_needed) {
+ goto end;
+ }
+
+ /* Open for writing */
+ fp = bt_open_memstream(&metadata_buf, &size);
+ if (!fp) {
+ PERR("Metadata open_memstream: %s\n", strerror(errno));
+ goto error;
+ }
+
+ /* Grab all available metadata. */
+ do {
+ /*
+ * get_one_metadata_packet returns the number of bytes
+ * received, 0 when we have received everything, a
+ * negative value on error.
+ */
+ ret = lttng_live_get_one_metadata_packet(trace, fp);
+ if (ret > 0) {
+ len_read += ret;
+ }
+ } while (ret > 0);
+
+ /*
+ * Consider metadata closed as soon as we get an error reading
+ * it (e.g. cannot be found).
+ */
+ if (ret < 0) {
+ if (!metadata->closed) {
+ metadata->closed = true;
+ /*
+ * Release our reference on the trace as soon as
+ * we know the metadata stream is not available
+ * anymore. This won't necessarily teardown the
+ * metadata objects immediately, but only when
+ * the data streams are done.
+ */
+ lttng_live_unref_trace(metadata->trace);
+ }
+ }
+
+ if (bt_close_memstream(&metadata_buf, &size, fp)) {
+ BT_LOGE("bt_close_memstream: %s", strerror(errno));
+ }
+ ret = 0;
+ fp = NULL;
+
+ if (len_read == 0) {
+ if (!trace->trace) {
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ goto end;
+ }
+ trace->new_metadata_needed = false;
+ goto end;
+ }
+
+ if (babeltrace_debug) {
+ // yydebug = 1;
+ }
+
+ fp = bt_fmemopen(metadata_buf, len_read, "rb");
+ if (!fp) {
+ PERR("Cannot memory-open metadata buffer: %s\n",
+ strerror(errno));
+ goto error;
+ }
+
+ decoder_status = ctf_metadata_decoder_decode(metadata->decoder, fp);
+ switch (decoder_status) {
+ case CTF_METADATA_DECODER_STATUS_OK:
+ BT_PUT(trace->trace);
+ trace->trace = ctf_metadata_decoder_get_trace(metadata->decoder);
+ trace->new_metadata_needed = false;
+ status = lttng_live_update_clock_map(trace);
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ break;
+ case CTF_METADATA_DECODER_STATUS_INCOMPLETE:
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ break;
+ case CTF_METADATA_DECODER_STATUS_ERROR:
+ case CTF_METADATA_DECODER_STATUS_INVAL_VERSION:
+ case CTF_METADATA_DECODER_STATUS_IR_VISITOR_ERROR:
+ goto error;
+ }
+
+ goto end;
+error:
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+end:
+ if (fp) {
+ int closeret;
+
+ closeret = fclose(fp);
+ if (closeret) {
+ PERR("Error on fclose");
+ }
+ }
+ return status;
+}
+
+BT_HIDDEN
+int lttng_live_metadata_create_stream(struct lttng_live_session *session,
+ uint64_t ctf_trace_id,
+ uint64_t stream_id)
+{
+ struct lttng_live_metadata *metadata = NULL;
+ struct lttng_live_trace *trace;
+
+ metadata = g_new0(struct lttng_live_metadata, 1);
+ if (!metadata) {
+ return -1;
+ }
+ metadata->stream_id = stream_id;
+ //TODO: add clock offset option
+ metadata->decoder = ctf_metadata_decoder_create(
+ session->lttng_live->error_fp, 0);
+ if (!metadata->decoder) {
+ goto error;
+ }
+ trace = lttng_live_ref_trace(session, ctf_trace_id);
+ if (!trace) {
+ goto error;
+ }
+ metadata->trace = trace;
+ trace->metadata = metadata;
+ return 0;
+
+error:
+ ctf_metadata_decoder_destroy(metadata->decoder);
+ g_free(metadata);
+ return -1;
+}
+
+BT_HIDDEN
+void lttng_live_metadata_fini(struct lttng_live_trace *trace)
+{
+ struct lttng_live_metadata *metadata = trace->metadata;
+
+ if (!metadata) {
+ return;
+ }
+ if (metadata->text) {
+ free(metadata->text);
+ }
+ ctf_metadata_decoder_destroy(metadata->decoder);
+ trace->metadata = NULL;
+ lttng_live_unref_trace(trace);
+ if (!metadata->closed) {
+ lttng_live_unref_trace(metadata->trace);
+ }
+ g_free(metadata);
+}
--- /dev/null
+#ifndef LTTNG_LIVE_METADATA_H
+#define LTTNG_LIVE_METADATA_H
+
+/*
+ * Copyright 2016 - Philippe Proulx <pproulx@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <glib.h>
+#include <babeltrace/babeltrace-internal.h>
+#include <babeltrace/ctf-ir/trace.h>
+#include "lttng-live-internal.h"
+
+int lttng_live_metadata_create_stream(struct lttng_live_session *session,
+ uint64_t ctf_trace_id,
+ uint64_t stream_id);
+
+enum bt_ctf_lttng_live_iterator_status lttng_live_metadata_update(
+ struct lttng_live_trace *trace);
+
+void lttng_live_metadata_fini(struct lttng_live_trace *trace);
+
+#endif /* LTTNG_LIVE_METADATA_H */
--- /dev/null
+/*
+ * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include <glib.h>
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include <babeltrace/compat/send-internal.h>
+#include <babeltrace/compiler-internal.h>
+
+#include "lttng-live-internal.h"
+#include "viewer-connection.h"
+#include "lttng-viewer-abi.h"
+#include "data-stream.h"
+#include "metadata.h"
+
+#define PRINT_ERR_STREAM viewer_connection->error_fp
+#define PRINT_PREFIX "lttng-live-viewer-connection"
+#define PRINT_DBG_CHECK lttng_live_debug
+#include "../print.h"
+
+static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+{
+ ssize_t ret;
+ size_t copied = 0, to_copy = len;
+
+ do {
+ ret = recv(fd, buf + copied, to_copy, 0);
+ if (ret > 0) {
+ assert(ret <= to_copy);
+ copied += ret;
+ to_copy -= ret;
+ }
+ } while ((ret > 0 && to_copy > 0)
+ || (ret < 0 && errno == EINTR));
+ if (ret > 0)
+ ret = copied;
+ /* ret = 0 means orderly shutdown, ret < 0 is error. */
+ return ret;
+}
+
+static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+{
+ ssize_t ret;
+
+ do {
+ ret = bt_send_nosigpipe(fd, buf, len);
+ } while (ret < 0 && errno == EINTR);
+ return ret;
+}
+
+/*
+ * hostname parameter needs to hold MAXNAMLEN chars.
+ */
+static int parse_url(struct bt_live_viewer_connection *viewer_connection)
+{
+ char remain[3][MAXNAMLEN];
+ int ret = -1, proto, proto_offset = 0;
+ const char *path = viewer_connection->url->str;
+ size_t path_len;
+
+ if (!path) {
+ goto end;
+ }
+ path_len = strlen(path); /* not accounting \0 */
+
+ /*
+ * Since sscanf API does not allow easily checking string length
+ * against a size defined by a macro. Test it beforehand on the
+ * input. We know the output is always <= than the input length.
+ */
+ if (path_len >= MAXNAMLEN) {
+ goto end;
+ }
+ ret = sscanf(path, "net%d://", &proto);
+ if (ret < 1) {
+ proto = 4;
+ /* net:// */
+ proto_offset = strlen("net://");
+ } else {
+ /* net4:// or net6:// */
+ proto_offset = strlen("netX://");
+ }
+ if (proto_offset > path_len) {
+ goto end;
+ }
+ if (proto == 6) {
+ PERR("[error] IPv6 is currently unsupported by lttng-live\n");
+ goto end;
+ }
+ /* TODO : parse for IPv6 as well */
+ /* Parse the hostname or IP */
+ ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
+ viewer_connection->relay_hostname, remain[0]);
+ if (ret == 2) {
+ /* Optional port number */
+ switch (remain[0][0]) {
+ case ':':
+ ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]);
+ /* Optional session ID with port number */
+ if (ret == 2) {
+ ret = sscanf(remain[1], "/%s", remain[2]);
+ /* Accept 0 or 1 (optional) */
+ if (ret < 0) {
+ goto end;
+ }
+ } else if (ret == 0) {
+ PERR("[error] Missing port number after delimitor ':'\n");
+ ret = -1;
+ goto end;
+ }
+ break;
+ case '/':
+ /* Optional session ID */
+ ret = sscanf(remain[0], "/%s", remain[2]);
+ /* Accept 0 or 1 (optional) */
+ if (ret < 0) {
+ goto end;
+ }
+ break;
+ default:
+ PERR("[error] wrong delimitor : %c\n", remain[0][0]);
+ ret = -1;
+ goto end;
+ }
+ }
+
+ if (viewer_connection->port < 0) {
+ viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
+ }
+
+ if (strlen(remain[2]) == 0) {
+ PDBG("Connecting to hostname : %s, port : %d, "
+ "proto : IPv%d\n",
+ viewer_connection->relay_hostname,
+ viewer_connection->port,
+ proto);
+ ret = 0;
+ goto end;
+ }
+ ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
+ viewer_connection->target_hostname,
+ viewer_connection->session_name);
+ if (ret != 2) {
+ PERR("[error] Format : "
+ "net://<hostname>/host/<target_hostname>/<session_name>\n");
+ goto end;
+ }
+
+ PDBG("Connecting to hostname : %s, port : %d, "
+ "target hostname : %s, session name : %s, "
+ "proto : IPv%d\n",
+ viewer_connection->relay_hostname,
+ viewer_connection->port,
+ viewer_connection->target_hostname,
+ viewer_connection->session_name, proto);
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_connect connect;
+ int ret;
+ ssize_t ret_len;
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
+ cmd.data_size = htobe64((uint64_t) sizeof(connect));
+ cmd.cmd_version = htobe32(0);
+
+ connect.viewer_session_id = -1ULL; /* will be set on recv */
+ connect.major = htobe32(LTTNG_LIVE_MAJOR);
+ connect.minor = htobe32(LTTNG_LIVE_MINOR);
+ connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
+ if (ret_len < 0) {
+ PERR("Error sending version: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(connect));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("[error] Error receiving version: %s", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(connect));
+
+ PDBG("Received viewer session ID : %" PRIu64 "\n",
+ be64toh(connect.viewer_session_id));
+ PDBG("Relayd version : %u.%u\n", be32toh(connect.major),
+ be32toh(connect.minor));
+
+ if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
+ PERR("Incompatible lttng-relayd protocol\n");
+ goto error;
+ }
+ /* Use the smallest protocol version implemented. */
+ if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
+ viewer_connection->minor = be32toh(connect.minor);
+ } else {
+ viewer_connection->minor = LTTNG_LIVE_MINOR;
+ }
+ viewer_connection->major = LTTNG_LIVE_MAJOR;
+ ret = 0;
+ return ret;
+
+error:
+ PERR("Unable to establish connection\n");
+ return -1;
+}
+
+static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
+{
+ struct hostent *host;
+ struct sockaddr_in server_addr;
+ int ret;
+
+ if (parse_url(viewer_connection)) {
+ goto error;
+ }
+
+ host = gethostbyname(viewer_connection->relay_hostname);
+ if (!host) {
+ PERR("[error] Cannot lookup hostname %s\n",
+ viewer_connection->relay_hostname);
+ goto error;
+ }
+
+ if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+ PERR("[error] Socket creation failed: %s\n", strerror(errno));
+ goto error;
+ }
+
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(viewer_connection->port);
+ server_addr.sin_addr = *((struct in_addr *) host->h_addr);
+ memset(&(server_addr.sin_zero), 0, 8);
+
+ if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
+ sizeof(struct sockaddr)) == -1) {
+ PERR("[error] Connection failed: %s\n", strerror(errno));
+ goto error;
+ }
+ if (lttng_live_handshake(viewer_connection)) {
+ goto error;
+ }
+
+ ret = 0;
+
+ return ret;
+
+error:
+ if (viewer_connection->control_sock >= 0) {
+ if (close(viewer_connection->control_sock)) {
+ PERR("Close: %s", strerror(errno));
+ }
+ }
+ viewer_connection->control_sock = -1;
+ return -1;
+}
+
+static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
+{
+ if (viewer_connection->control_sock < 0) {
+ return;
+ }
+ if (close(viewer_connection->control_sock)) {
+ PERR("Close: %s", strerror(errno));
+ viewer_connection->control_sock = -1;
+ }
+}
+
+static void connection_release(struct bt_object *obj)
+{
+ struct bt_live_viewer_connection *conn =
+ container_of(obj, struct bt_live_viewer_connection, obj);
+
+ bt_live_viewer_connection_destroy(conn);
+}
+
+static
+enum bt_value_status list_update_session(struct bt_value *results,
+ const struct lttng_viewer_session *session,
+ bool *_found)
+{
+ enum bt_value_status ret = BT_VALUE_STATUS_OK;
+ struct bt_value *map = NULL;
+ struct bt_value *hostname = NULL;
+ struct bt_value *session_name = NULL;
+ struct bt_value *btval = NULL;
+ int i, len;
+ bool found = false;
+
+ len = bt_value_array_size(results);
+ if (len < 0) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ for (i = 0; i < len; i++) {
+ const char *hostname_str = NULL;
+ const char *session_name_str = NULL;
+
+ map = bt_value_array_get(results, (size_t) i);
+ if (!map) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ hostname = bt_value_map_get(map, "target-hostname");
+ if (!hostname) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ session_name = bt_value_map_get(map, "session-name");
+ if (!session_name) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ ret = bt_value_string_get(hostname, &hostname_str);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ ret = bt_value_string_get(session_name, &session_name_str);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+
+ if (!strcmp(session->hostname, hostname_str)
+ && !strcmp(session->session_name,
+ session_name_str)) {
+ int64_t val;
+ uint32_t streams = be32toh(session->streams);
+ uint32_t clients = be32toh(session->clients);
+
+ found = true;
+
+ btval = bt_value_map_get(map, "stream-count");
+ if (!btval) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ ret = bt_value_integer_get(btval, &val);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ /* sum */
+ val += streams;
+ ret = bt_value_integer_set(btval, val);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ BT_PUT(btval);
+
+ btval = bt_value_map_get(map, "client-count");
+ if (!btval) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ ret = bt_value_integer_get(btval, &val);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ /* max */
+ val = max_t(int64_t, clients, val);
+ ret = bt_value_integer_set(btval, val);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ BT_PUT(btval);
+ }
+
+ BT_PUT(hostname);
+ BT_PUT(session_name);
+ BT_PUT(map);
+
+ if (found) {
+ break;
+ }
+ }
+end:
+ BT_PUT(btval);
+ BT_PUT(hostname);
+ BT_PUT(session_name);
+ BT_PUT(map);
+ *_found = found;
+ return ret;
+}
+
+static
+enum bt_value_status list_append_session(struct bt_value *results,
+ GString *base_url,
+ const struct lttng_viewer_session *session)
+{
+ enum bt_value_status ret = BT_VALUE_STATUS_OK;
+ struct bt_value *map = NULL;
+ GString *url = NULL;
+ bool found = false;
+
+ /*
+ * If the session already exists, add the stream count to it,
+ * and do max of client counts.
+ */
+ ret = list_update_session(results, session, &found);
+ if (ret != BT_VALUE_STATUS_OK || found) {
+ goto end;
+ }
+
+ map = bt_value_map_create();
+ if (!map) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+
+ if (base_url->len < 1) {
+ ret = BT_VALUE_STATUS_ERROR;
+ goto end;
+ }
+ /*
+ * key = "url",
+ * value = <string>,
+ */
+ url = g_string_new(base_url->str);
+ g_string_append(url, "/host/");
+ g_string_append(url, session->hostname);
+ g_string_append_c(url, '/');
+ g_string_append(url, session->session_name);
+
+ ret = bt_value_map_insert_string(map, "url", url->str);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+
+ /*
+ * key = "target-hostname",
+ * value = <string>,
+ */
+ ret = bt_value_map_insert_string(map, "target-hostname",
+ session->hostname);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+
+ /*
+ * key = "session-name",
+ * value = <string>,
+ */
+ ret = bt_value_map_insert_string(map, "session-name",
+ session->session_name);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+
+ /*
+ * key = "timer-us",
+ * value = <integer>,
+ */
+ {
+ uint32_t live_timer = be32toh(session->live_timer);
+
+ ret = bt_value_map_insert_integer(map, "timer-us",
+ live_timer);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ /*
+ * key = "stream-count",
+ * value = <integer>,
+ */
+ {
+ uint32_t streams = be32toh(session->streams);
+
+ ret = bt_value_map_insert_integer(map, "stream-count",
+ streams);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ }
+
+
+ /*
+ * key = "client-count",
+ * value = <integer>,
+ */
+ {
+ uint32_t clients = be32toh(session->clients);
+
+ ret = bt_value_map_insert_integer(map, "client-count",
+ clients);
+ if (ret != BT_VALUE_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ ret = bt_value_array_append(results, map);
+end:
+ if (url) {
+ g_string_free(url, TRUE);
+ }
+ BT_PUT(map);
+ return ret;
+}
+
+/*
+ * Data structure returned:
+ *
+ * {
+ * <array> = {
+ * [n] = {
+ * <map> = {
+ * {
+ * key = "url",
+ * value = <string>,
+ * },
+ * {
+ * key = "target-hostname",
+ * value = <string>,
+ * },
+ * {
+ * key = "session-name",
+ * value = <string>,
+ * },
+ * {
+ * key = "timer-us",
+ * value = <integer>,
+ * },
+ * {
+ * key = "stream-count",
+ * value = <integer>,
+ * },
+ * {
+ * key = "client-count",
+ * value = <integer>,
+ * },
+ * },
+ * }
+ * }
+ */
+
+BT_HIDDEN
+struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+{
+ struct bt_value *results = NULL;
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_list_sessions list;
+ uint32_t i, sessions_count;
+ ssize_t ret_len;
+
+ if (lttng_live_handshake(viewer_connection)) {
+ goto error;
+ }
+
+ results = bt_value_array_create();
+ if (!results) {
+ fprintf(stderr, "Error creating array\n");
+ goto error;
+ }
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ fprintf(stderr, "Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ if (ret_len == 0) {
+ fprintf(stderr, "Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ fprintf(stderr, "Error receiving session list: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(list));
+
+ sessions_count = be32toh(list.sessions_count);
+ for (i = 0; i < sessions_count; i++) {
+ struct lttng_viewer_session lsession;
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock,
+ &lsession, sizeof(lsession));
+ if (ret_len == 0) {
+ fprintf(stderr, "Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ fprintf(stderr, "Error receiving session: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(lsession));
+ lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
+ lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+ if (list_append_session(results,
+ viewer_connection->url, &lsession)
+ != BT_VALUE_STATUS_OK) {
+ goto error;
+ }
+ }
+ goto end;
+error:
+ BT_PUT(results);
+end:
+ return results;
+}
+
+static
+int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_list_sessions list;
+ struct lttng_viewer_session lsession;
+ uint32_t i, sessions_count;
+ ssize_t ret_len;
+ uint64_t session_id;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving session list: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(list));
+
+ sessions_count = be32toh(list.sessions_count);
+ for (i = 0; i < sessions_count; i++) {
+ ret_len = lttng_live_recv(viewer_connection->control_sock,
+ &lsession, sizeof(lsession));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving session: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(lsession));
+ lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
+ lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+ session_id = be64toh(lsession.id);
+
+ if ((strncmp(lsession.session_name,
+ viewer_connection->session_name,
+ MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+ viewer_connection->target_hostname,
+ MAXNAMLEN) == 0)) {
+ if (lttng_live_add_session(lttng_live, session_id)) {
+ goto error;
+ }
+ }
+ }
+
+ return 0;
+
+error:
+ PERR("Unable to query session ids\n");
+ return -1;
+}
+
+BT_HIDDEN
+int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_create_session_response resp;
+ ssize_t ret_len;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+ cmd.data_size = htobe64((uint64_t) 0);
+ cmd.cmd_version = htobe32(0);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving create session reply: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(resp));
+
+ if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+ PERR("Error creating viewer session\n");
+ goto error;
+ }
+ if (lttng_live_query_session_ids(lttng_live)) {
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return -1;
+}
+
+static
+int receive_streams(struct lttng_live_session *session,
+ uint32_t stream_count)
+{
+ ssize_t ret_len;
+ uint32_t i;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+
+ PDBG("Getting %" PRIu32 " new streams:\n", stream_count);
+ for (i = 0; i < stream_count; i++) {
+ struct lttng_viewer_stream stream;
+ struct lttng_live_stream_iterator *live_stream;
+ uint64_t stream_id;
+ uint64_t ctf_trace_id;
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving stream\n");
+ goto error;
+ }
+ assert(ret_len == sizeof(stream));
+ stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+ stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+ stream_id = be64toh(stream.id);
+ ctf_trace_id = be64toh(stream.ctf_trace_id);
+
+ if (stream.metadata_flag) {
+ PDBG(" metadata stream %" PRIu64 " : %s/%s\n",
+ stream_id, stream.path_name,
+ stream.channel_name);
+ if (lttng_live_metadata_create_stream(session,
+ ctf_trace_id, stream_id)) {
+ PERR("Error creating metadata stream\n");
+
+ goto error;
+ }
+ session->lazy_stream_notif_init = true;
+ } else {
+ PDBG(" stream %" PRIu64 " : %s/%s\n",
+ stream_id, stream.path_name,
+ stream.channel_name);
+ live_stream = lttng_live_stream_iterator_create(session,
+ ctf_trace_id, stream_id);
+ if (!live_stream) {
+ PERR("Error creating stream\n");
+ goto error;
+ }
+ }
+ }
+ return 0;
+
+error:
+ return -1;
+}
+
+BT_HIDDEN
+int lttng_live_attach_session(struct lttng_live_session *session)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_attach_session_request rq;
+ struct lttng_viewer_attach_session_response rp;
+ ssize_t ret_len;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+ uint64_t session_id = session->id;
+ uint32_t streams_count;
+
+ if (session->attached) {
+ return 0;
+ }
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.session_id = htobe64(session_id);
+ // TODO: add cmd line parameter to select seek beginning
+ // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
+ rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending attach request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving attach response: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rp));
+
+ streams_count = be32toh(rp.streams_count);
+ switch(be32toh(rp.status)) {
+ case LTTNG_VIEWER_ATTACH_OK:
+ break;
+ case LTTNG_VIEWER_ATTACH_UNK:
+ PERR("Session id %" PRIu64 " is unknown\n", session_id);
+ goto error;
+ case LTTNG_VIEWER_ATTACH_ALREADY:
+ PERR("There is already a viewer attached to this session\n");
+ goto error;
+ case LTTNG_VIEWER_ATTACH_NOT_LIVE:
+ PERR("Not a live session\n");
+ goto error;
+ case LTTNG_VIEWER_ATTACH_SEEK_ERR:
+ PERR("Wrong seek parameter\n");
+ goto error;
+ default:
+ PERR("Unknown attach return code %u\n", be32toh(rp.status));
+ goto error;
+ }
+
+ /* We receive the initial list of streams. */
+ if (receive_streams(session, streams_count)) {
+ goto error;
+ }
+
+ session->attached = true;
+ session->new_streams_needed = false;
+
+ return 0;
+
+error:
+ return -1;
+}
+
+BT_HIDDEN
+int lttng_live_detach_session(struct lttng_live_session *session)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_detach_session_request rq;
+ struct lttng_viewer_detach_session_response rp;
+ ssize_t ret_len;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+ uint64_t session_id = session->id;
+
+ if (!session->attached) {
+ return 0;
+ }
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.session_id = htobe64(session_id);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending detach request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving detach response: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rp));
+
+ switch(be32toh(rp.status)) {
+ case LTTNG_VIEWER_DETACH_SESSION_OK:
+ break;
+ case LTTNG_VIEWER_DETACH_SESSION_UNK:
+ PERR("Session id %" PRIu64 " is unknown\n", session_id);
+ goto error;
+ case LTTNG_VIEWER_DETACH_SESSION_ERR:
+ PERR("Error detaching session id %" PRIu64 "\n", session_id);
+ goto error;
+ default:
+ PERR("Unknown detach return code %u\n", be32toh(rp.status));
+ goto error;
+ }
+
+ session->attached = false;
+
+ return 0;
+
+error:
+ return -1;
+}
+
+BT_HIDDEN
+ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
+ FILE *fp)
+{
+ uint64_t len = 0;
+ int ret;
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_get_metadata rq;
+ struct lttng_viewer_metadata_packet rp;
+ char *data = NULL;
+ ssize_t ret_len;
+ struct lttng_live_session *session = trace->session;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_metadata *metadata = trace->metadata;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+
+ rq.stream_id = htobe64(metadata->stream_id);
+ cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending get_metadata request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving get_metadata response: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rp));
+
+ switch (be32toh(rp.status)) {
+ case LTTNG_VIEWER_METADATA_OK:
+ PDBG("get_metadata : OK\n");
+ break;
+ case LTTNG_VIEWER_NO_NEW_METADATA:
+ PDBG("get_metadata : NO NEW\n");
+ ret = 0;
+ goto end;
+ case LTTNG_VIEWER_METADATA_ERR:
+ PDBG("get_metadata : ERR\n");
+ goto error;
+ default:
+ PDBG("get_metadata : UNKNOWN\n");
+ goto error;
+ }
+
+ len = be64toh(rp.len);
+ PDBG("Writing %" PRIu64" bytes to metadata\n", len);
+ if (len <= 0) {
+ goto error;
+ }
+
+ data = zmalloc(len);
+ if (!data) {
+ PERR("relay data zmalloc: %s", strerror(errno));
+ goto error;
+ }
+ ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+ if (ret_len == 0) {
+ PERR("[error] Remote side has closed connection\n");
+ goto error_free_data;
+ }
+ if (ret_len < 0) {
+ PERR("[error] Error receiving trace packet: %s", strerror(errno));
+ goto error_free_data;
+ }
+ assert(ret_len == len);
+
+ do {
+ ret_len = fwrite(data, 1, len, fp);
+ } while (ret_len < 0 && errno == EINTR);
+ if (ret_len < 0) {
+ PERR("[error] Writing in the metadata fp\n");
+ goto error_free_data;
+ }
+ assert(ret_len == len);
+ free(data);
+ ret = len;
+end:
+ return ret;
+
+error_free_data:
+ free(data);
+error:
+ return -1;
+}
+
+/*
+ * Assign the fields from a lttng_viewer_index to a packet_index.
+ */
+static
+void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
+ struct packet_index *pindex)
+{
+ assert(lindex);
+ assert(pindex);
+
+ pindex->offset = be64toh(lindex->offset);
+ pindex->packet_size = be64toh(lindex->packet_size);
+ pindex->content_size = be64toh(lindex->content_size);
+ pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
+ pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
+ pindex->events_discarded = be64toh(lindex->events_discarded);
+}
+
+BT_HIDDEN
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream,
+ struct packet_index *index)
+{
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_get_next_index rq;
+ ssize_t ret_len;
+ struct lttng_viewer_index rp;
+ uint32_t flags, status;
+ enum bt_ctf_lttng_live_iterator_status retstatus =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+ struct lttng_live_trace *trace = stream->trace;
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.stream_id = htobe64(stream->viewer_stream_id);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending get_next_index request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving get_next_index response: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rp));
+
+ flags = be32toh(rp.flags);
+ status = be32toh(rp.status);
+
+ switch (status) {
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ {
+ uint64_t ctf_stream_class_id;
+
+ PDBG("get_next_index: inactive\n");
+ memset(index, 0, sizeof(struct packet_index));
+ index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
+ stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
+ ctf_stream_class_id = be64toh(rp.stream_id);
+ if (stream->ctf_stream_class_id != -1ULL) {
+ assert(stream->ctf_stream_class_id ==
+ ctf_stream_class_id);
+ } else {
+ stream->ctf_stream_class_id = ctf_stream_class_id;
+ }
+ stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
+ break;
+ }
+ case LTTNG_VIEWER_INDEX_OK:
+ {
+ uint64_t ctf_stream_class_id;
+
+ PDBG("get_next_index: OK\n");
+ lttng_index_to_packet_index(&rp, index);
+ ctf_stream_class_id = be64toh(rp.stream_id);
+ if (stream->ctf_stream_class_id != -1ULL) {
+ assert(stream->ctf_stream_class_id ==
+ ctf_stream_class_id);
+ } else {
+ stream->ctf_stream_class_id = ctf_stream_class_id;
+ }
+
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
+ stream->current_packet_end_timestamp =
+ index->ts_cycles.timestamp_end;
+
+ if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
+ PDBG("get_next_index: new metadata needed\n");
+ trace->new_metadata_needed = true;
+ }
+ if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ PDBG("get_next_index: new streams needed\n");
+ lttng_live_need_new_streams(lttng_live);
+ }
+ break;
+ }
+ case LTTNG_VIEWER_INDEX_RETRY:
+ PDBG("get_next_index: retry\n");
+ memset(index, 0, sizeof(struct packet_index));
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ goto end;
+ case LTTNG_VIEWER_INDEX_HUP:
+ PDBG("get_next_index: stream hung up\n");
+ memset(index, 0, sizeof(struct packet_index));
+ index->offset = EOF;
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ stream->state = LTTNG_LIVE_STREAM_EOF;
+ break;
+ case LTTNG_VIEWER_INDEX_ERR:
+ PERR("get_next_index: error\n");
+ memset(index, 0, sizeof(struct packet_index));
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ goto error;
+ default:
+ PERR("get_next_index: unkwown value\n");
+ memset(index, 0, sizeof(struct packet_index));
+ stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+ goto error;
+ }
+end:
+ return retstatus;
+
+error:
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ return retstatus;
+}
+
+BT_HIDDEN
+enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
+ uint64_t req_len, uint64_t *recv_len)
+{
+ enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_get_packet rq;
+ struct lttng_viewer_trace_packet rp;
+ ssize_t ret_len;
+ uint32_t flags, status;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+ struct lttng_live_trace *trace = stream->trace;
+
+ PDBG("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64 "\n",
+ offset, req_len);
+ cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.stream_id = htobe64(stream->viewer_stream_id);
+ rq.offset = htobe64(offset);
+ rq.len = htobe32(req_len);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending get_data request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving get_data response: %s\n", strerror(errno));
+ goto error;
+ }
+ if (ret_len != sizeof(rp)) {
+ PERR("[error] get_data_packet: expected %zu"
+ ", received %zd\n", sizeof(rp),
+ ret_len);
+ goto error;
+ }
+
+ flags = be32toh(rp.flags);
+ status = be32toh(rp.status);
+
+ switch (status) {
+ case LTTNG_VIEWER_GET_PACKET_OK:
+ req_len = be32toh(rp.len);
+ PDBG("get_data_packet: Ok, packet size : %" PRIu64 "\n", req_len);
+ break;
+ case LTTNG_VIEWER_GET_PACKET_RETRY:
+ /* Unimplemented by relay daemon */
+ PDBG("get_data_packet: retry\n");
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ goto end;
+ case LTTNG_VIEWER_GET_PACKET_ERR:
+ if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
+ PDBG("get_data_packet: new metadata needed, try again later\n");
+ trace->new_metadata_needed = true;
+ }
+ if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+ PDBG("get_data_packet: new streams needed, try again later\n");
+ lttng_live_need_new_streams(lttng_live);
+ }
+ if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
+ | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ goto end;
+ }
+ PERR("get_data_packet: error\n");
+ goto error;
+ case LTTNG_VIEWER_GET_PACKET_EOF:
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
+ goto end;
+ default:
+ PDBG("get_data_packet: unknown\n");
+ goto error;
+ }
+
+ if (req_len == 0) {
+ goto error;
+ }
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving trace packet: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == req_len);
+ *recv_len = ret_len;
+end:
+ return retstatus;
+
+error:
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ return retstatus;
+}
+
+/*
+ * Request new streams for a session.
+ */
+BT_HIDDEN
+enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
+ struct lttng_live_session *session)
+{
+ enum bt_ctf_lttng_live_iterator_status status =
+ BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct lttng_viewer_cmd cmd;
+ struct lttng_viewer_new_streams_request rq;
+ struct lttng_viewer_new_streams_response rp;
+ ssize_t ret_len;
+ struct lttng_live_component *lttng_live = session->lttng_live;
+ struct bt_live_viewer_connection *viewer_connection =
+ lttng_live->viewer_connection;
+ uint32_t streams_count;
+
+ if (!session->new_streams_needed) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ }
+
+ cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
+ cmd.data_size = htobe64((uint64_t) sizeof(rq));
+ cmd.cmd_version = htobe32(0);
+
+ memset(&rq, 0, sizeof(rq));
+ rq.session_id = htobe64(session->id);
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ if (ret_len < 0) {
+ PERR("Error sending cmd: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(cmd));
+
+ ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ if (ret_len < 0) {
+ PERR("Error sending get_new_streams request: %s\n", strerror(errno));
+ goto error;
+ }
+ assert(ret_len == sizeof(rq));
+
+ ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ if (ret_len == 0) {
+ PERR("Remote side has closed connection\n");
+ goto error;
+ }
+ if (ret_len < 0) {
+ PERR("Error receiving get_new_streams response\n");
+ goto error;
+ }
+ assert(ret_len == sizeof(rp));
+
+ streams_count = be32toh(rp.streams_count);
+
+ switch(be32toh(rp.status)) {
+ case LTTNG_VIEWER_NEW_STREAMS_OK:
+ session->new_streams_needed = false;
+ break;
+ case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
+ session->new_streams_needed = false;
+ goto end;
+ case LTTNG_VIEWER_NEW_STREAMS_HUP:
+ session->new_streams_needed = false;
+ session->closed = true;
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
+ case LTTNG_VIEWER_NEW_STREAMS_ERR:
+ PERR("get_new_streams error\n");
+ goto error;
+ default:
+ PERR("Unknown return code %u\n", be32toh(rp.status));
+ goto error;
+ }
+
+ if (receive_streams(session, streams_count)) {
+ goto error;
+ }
+end:
+ return status;
+
+error:
+ status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ return status;
+}
+
+BT_HIDDEN
+struct bt_live_viewer_connection *
+ bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+{
+ struct bt_live_viewer_connection *viewer_connection;
+
+ viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+
+ bt_object_init(&viewer_connection->obj, connection_release);
+ viewer_connection->control_sock = -1;
+ viewer_connection->port = -1;
+ viewer_connection->error_fp = error_fp;
+ viewer_connection->url = g_string_new(url);
+ if (!viewer_connection->url) {
+ goto error;
+ }
+
+ PDBG("Establishing connection to url \"%s\"...\n", url);
+ if (lttng_live_connect_viewer(viewer_connection)) {
+ goto error_report;
+ }
+ PDBG("Connection to url \"%s\" is established\n", url);
+ return viewer_connection;
+
+error_report:
+ printf_verbose("Failure to establish connection to url \"%s\"\n", url);
+error:
+ g_free(viewer_connection);
+ return NULL;
+}
+
+BT_HIDDEN
+void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
+{
+ PDBG("Closing connection to url \"%s\"\n", viewer_connection->url->str);
+ lttng_live_disconnect_viewer(viewer_connection);
+ g_string_free(viewer_connection->url, TRUE);
+ g_free(viewer_connection);
+}
--- /dev/null
+#ifndef LTTNG_LIVE_VIEWER_CONNECTION_H
+#define LTTNG_LIVE_VIEWER_CONNECTION_H
+
+/*
+ * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <stdio.h>
+#include <glib.h>
+
+#include <babeltrace/babeltrace-internal.h>
+
+//TODO: this should not be used by plugins. Should copy code into plugin
+//instead.
+#include "babeltrace/object-internal.h"
+
+#define LTTNG_DEFAULT_NETWORK_VIEWER_PORT 5344
+
+#define LTTNG_LIVE_MAJOR 2
+#define LTTNG_LIVE_MINOR 4
+
+struct bt_live_viewer_connection {
+ struct bt_object obj;
+
+ FILE *error_fp;
+
+ GString *url;
+
+ char relay_hostname[MAXNAMLEN];
+ char target_hostname[MAXNAMLEN];
+ char session_name[MAXNAMLEN];
+ int control_sock;
+ int port;
+
+ int32_t major;
+ int32_t minor;
+};
+
+struct packet_index_time {
+ int64_t timestamp_begin;
+ int64_t timestamp_end;
+};
+
+struct packet_index {
+ off_t offset; /* offset of the packet in the file, in bytes */
+ int64_t data_offset; /* offset of data within the packet, in bits */
+ uint64_t packet_size; /* packet size, in bits */
+ uint64_t content_size; /* content size, in bits */
+ uint64_t events_discarded;
+ uint64_t events_discarded_len; /* length of the field, in bits */
+ struct packet_index_time ts_cycles; /* timestamp in cycles */
+ struct packet_index_time ts_real; /* realtime timestamp */
+ /* CTF_INDEX 1.0 limit */
+ uint64_t stream_instance_id; /* ID of the channel instance */
+ uint64_t packet_seq_num; /* packet sequence number */
+};
+
+struct bt_live_viewer_connection *
+ bt_live_viewer_connection_create(const char *url, FILE *error_fp);
+
+void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *conn);
+
+struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection);
+
+#endif /* LTTNG_LIVE_VIEWER_CONNECTION_H */
BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_FINALIZE_METHOD(fs,
ctf_fs_iterator_finalize);
-/* ctf.lttng-live source */
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(auto, lttng_live, "lttng-live",
- lttng_live_iterator_next);
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_INIT_METHOD_WITH_ID(auto, lttng_live,
- lttng_live_init);
-BT_PLUGIN_SOURCE_COMPONENT_CLASS_DESCRIPTION_WITH_ID(auto, lttng_live,
- "Connect to an LTTng relay daemon and receive CTF streams.");
-
/* ctf.fs sink */
BT_PLUGIN_SINK_COMPONENT_CLASS(fs, writer_run);
BT_PLUGIN_SINK_COMPONENT_CLASS_INIT_METHOD(fs, writer_component_init);
writer_component_port_connected);
BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(fs, writer_component_finalize);
BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(fs, "Write CTF traces to the file system.");
+
+/* ctf.lttng-live source */
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(auto, lttng_live, "lttng-live",
+ lttng_live_iterator_next);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_DESCRIPTION_WITH_ID(auto, lttng_live,
+ "Connect to an LTTng relay daemon and receive CTF streams.");
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_INIT_METHOD_WITH_ID(auto, lttng_live,
+ lttng_live_component_init);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_QUERY_METHOD_WITH_ID(auto, lttng_live,
+ lttng_live_query);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_FINALIZE_METHOD_WITH_ID(auto, lttng_live,
+ lttng_live_component_finalize);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_INIT_METHOD_WITH_ID(
+ auto, lttng_live, lttng_live_iterator_init);
+BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_FINALIZE_METHOD_WITH_ID(
+ auto, lttng_live, lttng_live_iterator_finalize);
do { \
if (PRINT_ERR_STREAM) { \
fprintf(PRINT_ERR_STREAM, \
- "Error: " PRINT_PREFIX ": " fmt, \
+ "[error " PRINT_PREFIX "] " fmt, \
##__VA_ARGS__); \
} \
} while (0)
do { \
if (PRINT_ERR_STREAM) { \
fprintf(PRINT_ERR_STREAM, \
- "Warning: " PRINT_PREFIX ": " fmt, \
+ "[warning " PRINT_PREFIX "] " fmt, \
##__VA_ARGS__); \
} \
} while (0)
do { \
if (PRINT_DBG_CHECK) { \
fprintf(stderr, \
- "Debug: " PRINT_PREFIX ": " fmt, \
+ "[debug " PRINT_PREFIX "] " fmt, \
##__VA_ARGS__); \
} \
} while (0)
assert(pretty);
- if (bt_notification_get_type(notification) == BT_NOTIFICATION_TYPE_EVENT) {
+ switch (bt_notification_get_type(notification)) {
+ case BT_NOTIFICATION_TYPE_EVENT:
ret = pretty_print_event(pretty, notification);
+ break;
+ case BT_NOTIFICATION_TYPE_INACTIVITY:
+ fprintf(stderr, "Inactivity notification\n");
+ break;
+ case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
+ case BT_NOTIFICATION_TYPE_PACKET_END:
+ case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+ case BT_NOTIFICATION_TYPE_STREAM_END:
+ break;
+ default:
+ fprintf(stderr, "Unhandled notification type\n");
}
return ret;
it_ret = bt_notification_iterator_next(it);
switch (it_ret) {
- case BT_NOTIFICATION_ITERATOR_STATUS_ERROR:
- ret = BT_COMPONENT_STATUS_ERROR;
- goto end;
case BT_NOTIFICATION_ITERATOR_STATUS_END:
ret = BT_COMPONENT_STATUS_END;
BT_PUT(pretty->input_iterator);