#include <babeltrace/compat/send-internal.h>
#include <babeltrace/compiler-internal.h>
#include <babeltrace/common-internal.h>
+#include <babeltrace/graph/graph.h>
#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
#include "data-stream.h"
#include "metadata.h"
-static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+ void *buf, size_t len)
{
ssize_t ret;
size_t copied = 0, to_copy = len;
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ int fd = viewer_connection->control_sock;
do {
ret = recv(fd, buf + copied, to_copy, 0);
copied += ret;
to_copy -= ret;
}
- } while ((ret > 0 && to_copy > 0)
- || (ret < 0 && errno == EINTR));
+ if (ret < 0 && errno == EINTR) {
+ if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ } while (ret > 0 && to_copy > 0);
if (ret > 0)
ret = copied;
/* ret = 0 means orderly shutdown, ret < 0 is error. */
return ret;
}
-static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+ const void *buf, size_t len)
{
+ struct lttng_live_component *lttng_live =
+ viewer_connection->lttng_live;
+ int fd = viewer_connection->control_sock;
ssize_t ret;
- do {
+ for (;;) {
ret = bt_send_nosigpipe(fd, buf, len);
- } while (ret < 0 && errno == EINTR);
+ if (ret < 0 && errno == EINTR) {
+ if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+ break;
+ } else {
+ continue;
+ }
+ } else {
+ break;
+ }
+ }
return ret;
}
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
if (ret_len < 0) {
BT_LOGE("Error sending version: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(connect));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+ ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
for (i = 0; i < sessions_count; i++) {
struct lttng_viewer_session lsession;
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+ ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
sessions_count = be32toh(list.sessions_count);
for (i = 0; i < sessions_count; i++) {
- ret_len = lttng_live_recv(viewer_connection->control_sock,
+ ret_len = lttng_live_recv(viewer_connection,
&lsession, sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
cmd.data_size = htobe64((uint64_t) 0);
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+ ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
uint64_t stream_id;
uint64_t ctf_trace_id;
- ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+ ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending attach request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending detach request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
BT_LOGE("relay data zmalloc: %s", strerror(errno));
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+ ret_len = lttng_live_recv(viewer_connection, data, len);
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error_free_data;
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return retstatus;
error:
- retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
return retstatus;
}
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_data request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
goto error;
}
- ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+ ret_len = lttng_live_recv(viewer_connection, buf, req_len);
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return retstatus;
error:
- retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ } else {
+ retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ }
return retstatus;
}
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
- ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+ ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len < 0) {
BT_LOGE("Error sending cmd: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(cmd));
- ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
if (ret_len < 0) {
BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
goto error;
}
assert(ret_len == sizeof(rq));
- ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+ ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
goto error;
return status;
error:
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ } else {
+ status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ }
return status;
}
BT_HIDDEN
struct bt_live_viewer_connection *
- bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+ bt_live_viewer_connection_create(const char *url,
+ struct lttng_live_component *lttng_live)
{
struct bt_live_viewer_connection *viewer_connection;
bt_object_init(&viewer_connection->obj, connection_release);
viewer_connection->control_sock = -1;
viewer_connection->port = -1;
- viewer_connection->error_fp = error_fp;
+ viewer_connection->lttng_live = lttng_live;
viewer_connection->url = g_string_new(url);
if (!viewer_connection->url) {
goto error;