((type) (a) > (type) (b) ? (type) (a) : (type) (b))
#endif
-void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
- int whence);
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+ size_t index, int whence);
static void add_traces(gpointer key, gpointer value, gpointer user_data);
static int del_traces(gpointer key, gpointer value, gpointer user_data);
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
- int port)
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
{
struct hostent *host;
struct sockaddr_in server_addr;
int ret;
- host = gethostbyname(hostname);
+ host = gethostbyname(ctx->relay_hostname);
if (!host) {
ret = -1;
goto end;
}
server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(port);
+ server_addr.sin_port = htons(ctx->port);
server_addr.sin_addr = *((struct in_addr *) host->h_addr);
bzero(&(server_addr.sin_zero), 8);
return ret;
}
+static
+void free_session_list(GPtrArray *session_list)
+{
+ int i;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ free(relay_session->name);
+ free(relay_session->hostname);
+ }
+ g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+ int i;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+ "%u stream(s), %u client(s) connected)\n",
+ path, relay_session->hostname,
+ relay_session->name, relay_session->timer,
+ relay_session->streams, relay_session->clients);
+ }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+ char *session_name, uint32_t streams, uint32_t clients,
+ uint32_t timer)
+{
+ int i, found = 0;
+ struct lttng_live_relay_session *relay_session;
+
+ for (i = 0; i < session_list->len; i++) {
+ relay_session = g_ptr_array_index(session_list, i);
+ if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+ strncmp(relay_session->name,
+ session_name, NAME_MAX) == 0) {
+ relay_session->streams += streams;
+ if (relay_session->clients < clients)
+ relay_session->clients = clients;
+ found = 1;
+ break;
+ }
+ }
+ if (found)
+ return;
+
+ relay_session = g_new0(struct lttng_live_relay_session, 1);
+ relay_session->hostname = strndup(hostname, NAME_MAX);
+ relay_session->name = strndup(session_name, NAME_MAX);
+ relay_session->clients = clients;
+ relay_session->streams = streams;
+ relay_session->timer = timer;
+ g_ptr_array_add(session_list, relay_session);
+}
+
int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
struct lttng_viewer_session lsession;
- int i, ret;
+ int i, ret, sessions_count, print_list = 0;
ssize_t ret_len;
- int sessions_count;
+ uint64_t session_id;
+ GPtrArray *session_list = NULL;
+
+ if (strlen(ctx->session_name) == 0) {
+ print_list = 1;
+ session_list = g_ptr_array_new();
+ }
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = 0;
assert(ret_len == sizeof(list));
sessions_count = be32toh(list.sessions_count);
- fprintf(stdout, "%u active session(s)%c\n", sessions_count,
- sessions_count > 0 ? ':' : ' ');
for (i = 0; i < sessions_count; i++) {
do {
ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
assert(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+ session_id = be64toh(lsession.id);
+
+ if (print_list) {
+ update_session_list(session_list,
+ lsession.hostname,
+ lsession.session_name,
+ be32toh(lsession.streams),
+ be32toh(lsession.clients),
+ be32toh(lsession.live_timer));
+ } else {
+ if ((strncmp(lsession.session_name, ctx->session_name,
+ NAME_MAX) == 0) && (strncmp(lsession.hostname,
+ ctx->traced_hostname, NAME_MAX) == 0)) {
+ printf_verbose("Reading from session %" PRIu64 "\n",
+ session_id);
+ g_array_append_val(ctx->session_ids,
+ session_id);
+ }
+ }
+ }
- fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
- "%u stream(s), %u client(s) connected)\n",
- path, be64toh(lsession.id),
- lsession.session_name, lsession.hostname,
- be32toh(lsession.live_timer),
- be32toh(lsession.streams),
- be32toh(lsession.clients));
+ if (print_list) {
+ print_session_list(session_list, path);
+ free_session_list(session_list);
}
ret = 0;
if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
printf_verbose("Session %" PRIu64 " closed\n",
id);
+ /*
+ * The streams have already been closed during
+ * the reading, so we only need to get rid of
+ * the trace in our internal table of sessions.
+ */
g_array_remove_index(ctx->session_ids, i);
/*
* We can't continue iterating on the g_array
return ret;
}
+static
void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
int whence)
{
struct bt_mmap_stream_list mmap_list;
struct lttng_live_ctx *ctx = NULL;
+ /*
+ * We don't know how many streams we will receive for a trace, so
+ * once we are done receiving the traces, we add all the traces
+ * received to the bt_context.
+ * We can receive streams during the attach command or the
+ * get_new_streams, so we have to make sure not to add multiple
+ * times the same traces.
+ * If a trace is already in the context, we just skip this function.
+ */
if (trace->in_use)
return;
bt_ctx->trace_handles,
(gpointer) (unsigned long) ret);
td = handle->td;
- fprintf(stderr, "Handle : %d, td : %p\n", ret, td);
bt_iter_add_trace(bt_ctx->current_iterator, td);
}
struct lttng_viewer_stream stream;
int ret, i;
ssize_t ret_len;
+ uint32_t stream_count;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
cmd.data_size = sizeof(rq);
ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
} while (ret_len < 0 && errno == EINTR);
if (ret_len < 0) {
- fprintf(stderr, "[error] Error sending attach request\n");
+ fprintf(stderr, "[error] Error sending get_new_streams request\n");
ret = ret_len;
goto error;
}
goto end;
}
- ctx->session->stream_count += be32toh(rp.streams_count);
+ stream_count = be32toh(rp.streams_count);
+ ctx->session->stream_count += stream_count;
/*
* When the session is created but not started, we do an active wait
* until it starts. It allows the viewer to start processing the trace
ctx->session->stream_count);
ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
ctx->session->stream_count);
- for (i = 0; i < be32toh(rp.streams_count); i++) {
+ for (i = 0; i < stream_count; i++) {
do {
ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
} while (ret_len < 0 && errno == EINTR);
/*
* hostname parameter needs to hold NAME_MAX chars.
*/
-static int parse_url(const char *path, char *hostname, int *port,
- uint64_t *session_id, GArray *session_ids)
+static
+int parse_url(const char *path, struct lttng_live_ctx *ctx)
{
char remain[3][NAME_MAX];
int ret = -1, proto, proto_offset = 0;
size_t path_len = strlen(path);
- char *str, *strctx;
/*
* Since sscanf API does not allow easily checking string length
/* TODO : parse for IPv6 as well */
/* Parse the hostname or IP */
ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
- hostname, remain[0]);
+ ctx->relay_hostname, remain[0]);
if (ret == 2) {
/* Optional port number */
switch (remain[0][0]) {
case ':':
- ret = sscanf(remain[0], ":%d%s", port, remain[1]);
+ ret = sscanf(remain[0], ":%d%s", &ctx->port, remain[1]);
/* Optional session ID with port number */
if (ret == 2) {
ret = sscanf(remain[1], "/%s", remain[2]);
}
}
- if (*port < 0)
- *port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
+ if (ctx->port < 0)
+ ctx->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
if (strlen(remain[2]) == 0) {
printf_verbose("Connecting to hostname : %s, port : %d, "
"proto : IPv%d\n",
- hostname, *port, proto);
+ ctx->relay_hostname, ctx->port, proto);
ret = 0;
goto end;
}
+ ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
+ ctx->traced_hostname, ctx->session_name);
+ if (ret != 2) {
+ fprintf(stderr, "[error] Format : "
+ "net://<hostname>/host/<traced_hostname>/<session_name>\n");
+ goto end;
+ }
printf_verbose("Connecting to hostname : %s, port : %d, "
- "session id(s) : %s, proto : IPv%d\n",
- hostname, *port, remain[2], proto);
- str = strtok_r(remain[2], ",", &strctx);
- do {
- char *endptr;
- uint64_t id;
-
- id = strtoull(str, &endptr, 0);
- if (*endptr != '\0' || str == endptr || errno != 0) {
- fprintf(stderr, "[error] parsing session id\n");
- ret = -1;
- goto end;
- }
- g_array_append_val(session_ids, id);
- } while ((str = strtok_r(NULL, ",", &strctx)));
-
+ "traced hostname : %s, session name : %s, "
+ "proto : IPv%d\n",
+ ctx->relay_hostname, ctx->port, ctx->traced_hostname,
+ ctx->session_name, proto);
ret = 0;
end:
static int lttng_live_open_trace_read(const char *path)
{
- char hostname[NAME_MAX];
- int port = -1;
- uint64_t session_id = -1ULL;
int ret = 0;
- struct lttng_live_ctx ctx;
+ struct lttng_live_ctx *ctx;
- ctx.session = g_new0(struct lttng_live_session, 1);
+ ctx = g_new0(struct lttng_live_ctx, 1);
+ ctx->session = g_new0(struct lttng_live_session, 1);
/* We need a pointer to the context from the packet_seek function. */
- ctx.session->ctx = &ctx;
+ ctx->session->ctx = ctx;
/* HT to store the CTF traces. */
- ctx.session->ctf_traces = g_hash_table_new(g_direct_hash,
+ ctx->session->ctf_traces = g_hash_table_new(g_direct_hash,
g_direct_equal);
+ ctx->port = -1;
+ ctx->session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
- ctx.session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
-
- ret = parse_url(path, hostname, &port, &session_id, ctx.session_ids);
+ ret = parse_url(path, ctx);
if (ret < 0) {
goto end_free;
}
- ret = lttng_live_connect_viewer(&ctx, hostname, port);
+ ret = lttng_live_connect_viewer(ctx);
if (ret < 0) {
fprintf(stderr, "[error] Connection failed\n");
goto end_free;
}
printf_verbose("LTTng-live connected to relayd\n");
- ret = lttng_live_establish_connection(&ctx);
+ ret = lttng_live_establish_connection(ctx);
if (ret < 0) {
goto end_free;
}
- if (ctx.session_ids->len == 0) {
- printf_verbose("Listing sessions\n");
- ret = lttng_live_list_sessions(&ctx, path);
- if (ret < 0) {
- fprintf(stderr, "[error] List error\n");
- goto end_free;
- }
- } else {
- lttng_live_read(&ctx);
+ printf_verbose("Listing sessions\n");
+ ret = lttng_live_list_sessions(ctx, path);
+ if (ret < 0) {
+ fprintf(stderr, "[error] List error\n");
+ goto end_free;
}
+ if (ctx->session_ids->len > 0)
+ lttng_live_read(ctx);
+
end_free:
- g_array_free(ctx.session_ids, TRUE);
- g_hash_table_destroy(ctx.session->ctf_traces);
- g_free(ctx.session);
- g_free(ctx.session->streams);
+ g_hash_table_destroy(ctx->session->ctf_traces);
+ g_free(ctx->session);
+ g_free(ctx->session->streams);
+ g_free(ctx);
return ret;
}