#include <babeltrace/graph/component.h>
#include <babeltrace/graph/notification-iterator.h>
#include <babeltrace/graph/clock-class-priority-map.h>
+#include <babeltrace/types.h>
#include "viewer-connection.h"
//TODO: this should not be used by plugins. Should copy code into plugin
struct bt_ctf_stream *stream;
struct lttng_live_trace *trace;
- struct bt_private_port *port;
+ struct bt_private_port *port; /* weak ref. */
/* Node of stream list within the trace. */
struct bt_list_head node;
struct lttng_live_stream_iterator_generic p;
struct lttng_live_component *lttng_live;
- struct bt_private_port *port;
+ struct bt_private_port *port; /* weak ref. */
};
struct lttng_live_component_options {
size_t max_query_size;
struct lttng_live_component_options options;
- struct bt_private_port *no_stream_port;
+ struct bt_private_port *no_stream_port; /* weak */
struct lttng_live_no_stream_iterator *no_stream_iter;
struct bt_component *downstream_component;
- struct bt_graph *graph; /* weak */
};
enum bt_ctf_lttng_live_iterator_status {
void lttng_live_unref_trace(struct lttng_live_trace *trace);
void lttng_live_need_new_streams(struct lttng_live_component *lttng_live);
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live);
+
#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H */
#include <babeltrace/graph/notification-inactivity.h>
#include <babeltrace/graph/graph.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
#include <inttypes.h>
#include <glib.h>
#include <assert.h>
}
}
-#define print_stream_state(stream) \
- print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
- bt_port_get_name(bt_port_from_private_port(stream->port)), \
- print_state(stream), stream->last_returned_inactivity_timestamp, \
- stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+ struct bt_port *port;
+
+ port = bt_port_from_private_port(stream->port);
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+ bt_port_get_name(port),
+ print_state(stream),
+ stream->last_returned_inactivity_timestamp,
+ stream->current_inactivity_timestamp);
+ bt_put(port);
+}
+
+BT_HIDDEN
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+ struct bt_component *component;
+ struct bt_graph *graph;
+ bt_bool ret;
+
+ if (!lttng_live) {
+ return BT_FALSE;
+ }
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ graph = bt_component_get_graph(component);
+ ret = bt_graph_is_canceled(graph);
+ bt_put(graph);
+ bt_put(component);
+ return ret;
+}
BT_HIDDEN
int lttng_live_add_port(struct lttng_live_component *lttng_live,
int ret;
struct bt_private_port *private_port;
char name[STREAM_NAME_MAX_LEN];
+ enum bt_component_status status;
ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
assert(ret > 0);
strcpy(stream_iter->name, name);
- ret = bt_private_component_source_add_output_private_port(
+ status = bt_private_component_source_add_output_private_port(
lttng_live->private_component, name, stream_iter,
&private_port);
- if (ret) {
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_put(private_port); /* weak */
BT_LOGI("Added port %s", name);
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
if (ret) {
return -1;
}
- BT_PUT(lttng_live->no_stream_port);
+ lttng_live->no_stream_port = NULL;
lttng_live->no_stream_iter->port = NULL;
}
stream_iter->port = private_port;
}
BT_PUT(component);
if (nr_ports == 1) {
+ enum bt_component_status status;
+
assert(!lttng_live->no_stream_port);
- ret = bt_private_component_source_add_output_private_port(lttng_live->private_component,
+ status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
"no-stream", lttng_live->no_stream_iter,
<tng_live->no_stream_port);
- if (ret) {
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
}
+ bt_get(port);
ret = bt_private_port_remove_from_component(port);
+ bt_put(port);
if (ret) {
return -1;
}
retval = bt_ctf_trace_set_is_static(trace->trace);
assert(!retval);
+ BT_PUT(trace->trace);
lttng_live_metadata_fini(trace);
BT_PUT(trace->cc_prio_map);
g_free(trace);
BT_LOGI("Destroy session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- if (!bt_graph_is_canceled(session->lttng_live->graph)) {
+ if (!lttng_live_is_canceled(session->lttng_live)) {
/* Old relayd cannot detach sessions. */
BT_LOGD("Unable to detach session %" PRIu64,
session->id);
struct lttng_live_trace *trace, *t;
if (lttng_live_attach_session(session)) {
- if (bt_graph_is_canceled(lttng_live->graph)) {
+ if (lttng_live_is_canceled(lttng_live)) {
return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
g_string_free(lttng_live->url, TRUE);
}
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
assert(!ret);
- BT_PUT(lttng_live->no_stream_port);
}
if (lttng_live->no_stream_iter) {
g_free(lttng_live->no_stream_iter);
static
struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
- struct bt_private_component *private_component,
- struct bt_graph *graph)
+ struct bt_private_component *private_component)
{
struct lttng_live_component *lttng_live;
struct bt_value *value = NULL;
if (!lttng_live->url) {
goto error;
}
+ BT_PUT(value);
lttng_live->viewer_connection =
bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
if (!lttng_live->viewer_connection) {
goto error;
}
lttng_live->private_component = private_component;
- lttng_live->graph = graph;
goto end;
{
struct lttng_live_component *lttng_live;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- struct bt_component *component;
- struct bt_graph *graph;
-
- component = bt_component_from_private_component(private_component);
- graph = bt_component_get_graph(component);
- bt_put(graph); /* weak */
- bt_put(component);
/* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, private_component,
- graph);
+ lttng_live = lttng_live_component_create(params, private_component);
if (!lttng_live) {
- if (bt_graph_is_canceled(graph)) {
- ret = BT_COMPONENT_STATUS_AGAIN;
- } else {
- ret = BT_COMPONENT_STATUS_NOMEM;
- }
+ //TODO : we need access to the application cancel state
+ //because we are not part of a graph yet.
+ ret = BT_COMPONENT_STATUS_NOMEM;
goto end;
}
if (ret != BT_COMPONENT_STATUS_OK) {
goto end;
}
-
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
ret = bt_private_component_set_user_data(private_component, lttng_live);