struct bt_component *source = NULL, *sink = NULL;
struct bt_value *source_params = NULL, *sink_params = NULL;
struct bt_notification_iterator *it = NULL;
+ enum bt_component_status sink_status;
ret = parse_options(argc, argv);
if (ret < 0) {
goto end;
}
- do {
- enum bt_component_status sink_status;
- struct bt_notification *notification =
- bt_notification_iterator_get_notification(it);
-
- if (!notification) {
- /*
- * Should never happen in final code except after next
- * has returned BT_NOTIFICATION_ITERATOR_STATUS_END.
- *
- * Right now it happens at the first event since the
- * iterator is not completely initialized and we don't
- * have a notification "heap" in place.
- */
- continue;
- }
+ sink_status = bt_component_sink_add_iterator(sink, it);
+ if (sink_status != BT_COMPONENT_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
- sink_status = bt_component_sink_handle_notification(sink,
- notification);
- BT_PUT(notification);
- if (sink_status != BT_COMPONENT_STATUS_OK) {
- fprintf(stderr, "Sink component returned an error, aborting...\n");
+ while (true) {
+ sink_status = bt_component_sink_consume(sink);
+
+ switch (sink_status) {
+ case BT_COMPONENT_STATUS_AGAIN:
+ /* Wait for an arbitraty 500 ms. */
+ usleep(500000);
break;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ case BT_COMPONENT_STATUS_END:
+ goto end;
+ default:
+ fprintf(stderr, "Sink component returned an error, aborting...\n");
+ ret = -1;
+ goto end;
}
- } while (bt_notification_iterator_next(it) ==
- BT_NOTIFICATION_ITERATOR_STATUS_OK);
+ }
/* teardown and exit */
end:
BT_PUT(component_factory);
};
BT_HIDDEN
-int bt_component_class_init(
- struct bt_component_class *class, enum bt_component_type type,
+int bt_component_class_init(struct bt_component_class *class, enum bt_component_type type,
const char *name);
BT_HIDDEN
/** User-defined data and its destruction callback */
void *user_data;
bt_component_destroy_cb user_destroy;
+
+ /**
+ * Used to protect operations which may only be used during
+ * a component's initialization.
+ */
+ bool initializing;
};
BT_HIDDEN
*/
enum bt_component_status {
/** No error, okay. */
- BT_COMPONENT_STATUS_OK = 0,
+ BT_COMPONENT_STATUS_OK = 0,
+ /** No more work to be done by this component. **/
+ BT_COMPONENT_STATUS_END = 1,
+ /**
+ * Component can't process a notification at this time
+ * (e.g. would block), try again later.
+ */
+ BT_COMPONENT_STATUS_AGAIN = 2,
/** General error. */
BT_COMPONENT_STATUS_ERROR = -1,
/** Unsupported component feature. */
*/
#include <babeltrace/plugin/notification/notification.h>
+#include <stdint.h>
#ifdef __cplusplus
extern "C" {
/** bt_component_sink */
/**
- * Notification handling function type.
+ * Notification consumption function type.
*
- * A reference must be taken on the notification if the component has to
- * keep ownership of the notification beyond the invocation of the callback.
+ * @param sink Sink component instance
+ * @returns One of #bt_component_status values
+ */
+typedef enum bt_component_status (*bt_component_sink_consume_cb)(
+ struct bt_component *);
+
+/**
+ * Iterator addition function type.
+ *
+ * A sink component may choose to refuse the addition of an iterator
+ * by not returning BT_COMPONENT_STATUS_OK.
*
* @param sink Sink component instance
- * @param notification Notification to handle
* @returns One of #bt_component_status values
*/
-typedef enum bt_component_status (*bt_component_sink_handle_notification_cb)(
- struct bt_component *, struct bt_notification *);
+typedef enum bt_component_status (*bt_component_sink_add_iterator_cb)(
+ struct bt_component *, struct bt_notification_iterator *);
/**
- * Set a sink component's notification handling callback.
+ * Set a sink component's consumption callback.
*
- * @param sink Sink component instance
- * @param handle_notification Notification handling callback
- * @returns One of #bt_component_status values
+ * @param sink Sink component instance
+ * @param consume Consumption callback
+ * @returns One of #bt_component_status values
*/
extern enum bt_component_status
-bt_component_sink_set_handle_notification_cb(struct bt_component *sink,
- bt_component_sink_handle_notification_cb handle_notification);
+bt_component_sink_set_consume_cb(struct bt_component *sink,
+ bt_component_sink_consume_cb consume);
/**
- * Register a sink to a given notification type.
- *
- * A sink is always registered to notifications of type
- * BT_NOTIFICATION_TYPE_EVENT. However, it may opt to receive any (or all)
- * other notification type(s).
+ * Set a sink component's iterator addition callback.
*
- * @param sink Sink component instance.
- * @param type One of #bt_notification_type
- * @returns One of #bt_component_status
+ * @param sink Sink component instance
+ * @param add_iterator Iterator addition callback
+ * @returns One of #bt_component_status values
*/
extern enum bt_component_status
-bt_component_sink_register_notification_type(struct bt_component *sink,
- enum bt_notification_type type);
+bt_component_sink_set_add_iterator_cb(struct bt_component *sink,
+ bt_component_sink_add_iterator_cb add_iterator);
+
+/* Defaults to 1. */
+extern enum bt_component_status
+bt_component_sink_set_minimum_input_count(struct bt_component *sink,
+ unsigned int minimum);
+
+/* Defaults to 1. */
+extern enum bt_component_status
+bt_component_sink_set_maximum_input_count(struct bt_component *sink,
+ unsigned int maximum);
+
+extern enum bt_component_status
+bt_component_sink_get_input_count(struct bt_component *sink,
+ unsigned int *count);
+
+/* May return NULL after an interator has reached its end. */
+extern enum bt_component_status
+bt_component_sink_get_input_iterator(struct bt_component *sink,
+ unsigned int input, struct bt_notification_iterator **iterator);
-/** bt_component_notification_iterator */
+/** bt_notification_iterator */
/**
* Function returning an iterator's current notification.
*
struct bt_component_sink {
struct bt_component parent;
- /* Component implementation callbacks */
- bt_component_sink_handle_notification_cb handle_notification;
- notification_mask_t registered_notifications_mask;
+ bt_component_sink_consume_cb consume;
+ bt_component_sink_add_iterator_cb add_iterator;
+ GPtrArray *inputs;
+ unsigned int min_input_count;
+ unsigned int max_input_count;
+ bool validated_inputs;
+/* notification_mask_t registered_notifications_mask;*/
};
/**
struct bt_notification;
/**
- * Hand-off a notification to a sink component.
+ * Add a notification iterator to a sink component.
*
* @param component Component instance
- * @param notification Notification instance to handle
+ * @param iterator Notification iterator to add
* @returns One of #bt_component_status values
*/
-enum bt_component_status bt_component_sink_handle_notification(
+enum bt_component_status bt_component_sink_add_iterator(
struct bt_component *component,
- struct bt_notification *notification);
+ struct bt_notification_iterator *iterator);
+
+/**
+ * Process one event, consuming from sources as needed.
+ *
+ * @param component Component instance
+ * @returns One of #bt_component_status values
+ */
+enum bt_component_status bt_component_sink_consume(
+ struct bt_component *component);
#ifdef __cplusplus
}
}
component_class = bt_component_class_create(type, name, description,
- init, factory->current_plugin);
+ init, factory->current_plugin);
g_ptr_array_add(factory->component_classes, component_class);
end:
return ret;
goto end;
}
+ component->initializing = true;
component_class->init(component, params);
+ component->initializing = false;
ret = component_validation_funcs[type](component);
if (ret != BT_COMPONENT_STATUS_OK) {
BT_PUT(component);
{
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- if (!component) {
+ if (!component || !component->initializing) {
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
{
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- if (!component) {
+ if (!component || !component->initializing) {
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
*/
#include <babeltrace/compiler.h>
+#include <babeltrace/values.h>
#include <babeltrace/plugin/sink-internal.h>
#include <babeltrace/plugin/component-internal.h>
#include <babeltrace/plugin/notification/notification.h>
struct bt_component_sink *sink;
sink = container_of(component, struct bt_component_sink, parent);
- if (sink->registered_notifications_mask == 0) {
- /*
- * A sink must be registered to at least one notification type.
- */
- printf_error("Invalid sink component; not registered to any notification");
+ if (!sink->consume) {
+ printf_error("Invalid sink component; no notification consumption callback defined.");
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
- if (!sink->handle_notification) {
- printf_error("Invalid sink component; no notification handling callback defined.");
+ if (sink->min_input_count > sink->max_input_count) {
+ printf_error("Invalid sink component; minimum input count > maximum input count.");
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
return ret;
}
+static
+void bt_component_sink_destroy(struct bt_component *component)
+{
+ struct bt_component_sink *sink = container_of(component,
+ struct bt_component_sink, parent);
+
+ g_ptr_array_free(sink->inputs, TRUE);
+}
+
BT_HIDDEN
struct bt_component *bt_component_sink_create(
struct bt_component_class *class, struct bt_value *params)
}
sink->parent.class = bt_get(class);
- ret = bt_component_init(&sink->parent, NULL);
+ ret = bt_component_init(&sink->parent, bt_component_sink_destroy);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}
+ sink->min_input_count = 1;
+ sink->max_input_count = 1;
+/*
ret = bt_component_sink_register_notification_type(&sink->parent,
BT_NOTIFICATION_TYPE_EVENT);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}
+*/
+ sink->inputs = g_ptr_array_new_with_free_func(bt_put);
+ if (!sink->inputs) {
+ goto error;
+ }
end:
return sink ? &sink->parent : NULL;
error:
return NULL;
}
-enum bt_component_status bt_component_sink_handle_notification(
- struct bt_component *component,
- struct bt_notification *notification)
+static
+enum bt_component_status validate_inputs(struct bt_component_sink *sink)
+{
+ size_t array_size = sink->inputs->len;
+
+ if (array_size < sink->min_input_count ||
+ array_size > sink->max_input_count) {
+ return BT_COMPONENT_STATUS_INVALID;
+ }
+
+ return BT_COMPONENT_STATUS_OK;
+}
+
+enum bt_component_status bt_component_sink_consume(
+ struct bt_component *component)
{
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
struct bt_component_sink *sink = NULL;
- if (!component || !notification) {
+ if (!component) {
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
}
sink = container_of(component, struct bt_component_sink, parent);
- assert(sink->handle_notification);
- ret = sink->handle_notification(component, notification);
+ if (!sink->validated_inputs) {
+ ret = validate_inputs(sink);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ sink->validated_inputs = true;
+ }
+
+ assert(sink->consume);
+ ret = sink->consume(component);
end:
return ret;
}
-
+/*
+static
enum bt_component_status bt_component_sink_register_notification_type(
struct bt_component *component, enum bt_notification_type type)
{
end:
return ret;
}
+*/
+enum bt_component_status bt_component_sink_set_consume_cb(
+ struct bt_component *component,
+ bt_component_sink_consume_cb consume)
+{
+ struct bt_component_sink *sink;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
-enum bt_component_status bt_component_sink_set_handle_notification_cb(
+ if (!component) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ if (!component->initializing) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ sink = container_of(component, struct bt_component_sink, parent);
+ sink->consume = consume;
+end:
+ return ret;
+}
+
+enum bt_component_status bt_component_sink_set_minimum_input_count(
struct bt_component *component,
- bt_component_sink_handle_notification_cb handle_notification)
+ unsigned int minimum)
{
+ struct bt_component_sink *sink;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- struct bt_component_sink *sink = NULL;
if (!component) {
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
+ if (!component->initializing) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
sink = container_of(component, struct bt_component_sink, parent);
- sink->handle_notification = handle_notification;
+ sink->min_input_count = minimum;
+end:
+ return ret;
+}
+
+enum bt_component_status bt_component_sink_set_maximum_input_count(
+ struct bt_component *component,
+ unsigned int maximum)
+{
+ struct bt_component_sink *sink;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ if (!component) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ if (!component->initializing) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ sink = container_of(component, struct bt_component_sink, parent);
+ sink->max_input_count = maximum;
+end:
+ return ret;
+}
+
+enum bt_component_status
+bt_component_sink_get_input_count(struct bt_component *component,
+ unsigned int *count)
+{
+ struct bt_component_sink *sink;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ if (!component || !count) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ sink = container_of(component, struct bt_component_sink, parent);
+ *count = (unsigned int) sink->inputs->len;
+end:
+ return ret;
+}
+
+enum bt_component_status
+bt_component_sink_get_input_iterator(struct bt_component *component,
+ unsigned int input, struct bt_notification_iterator **iterator)
+{
+ struct bt_component_sink *sink;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ if (!component || !iterator) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ sink = container_of(component, struct bt_component_sink, parent);
+ if (input >= (unsigned int) sink->inputs->len) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ *iterator = bt_get(g_ptr_array_index(sink->inputs, input));
+end:
+ return ret;
+}
+
+enum bt_component_status
+bt_component_sink_add_iterator(struct bt_component *component,
+ struct bt_notification_iterator *iterator)
+{
+ struct bt_component_sink *sink;
+ enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+ if (!component || !iterator) {
+ ret = BT_COMPONENT_STATUS_INVALID;
+ goto end;
+ }
+
+ if (bt_component_get_type(component) != BT_COMPONENT_TYPE_SINK) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ sink = container_of(component, struct bt_component_sink, parent);
+ if (sink->inputs->len == sink->max_input_count) {
+ ret = BT_COMPONENT_STATUS_UNSUPPORTED;
+ goto end;
+ }
+
+ if (sink->add_iterator) {
+ ret = sink->add_iterator(component, iterator);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ g_ptr_array_add(sink->inputs, bt_get(iterator));
end:
return ret;
}
struct bt_component_source *source;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- if (component->class->type != BT_COMPONENT_TYPE_SOURCE) {
+ if (component->class->type != BT_COMPONENT_TYPE_SOURCE ||
+ !component->initializing) {
ret = BT_COMPONENT_STATUS_INVALID;
goto end;
}
#include <babeltrace/plugin/component.h>
#include <babeltrace/plugin/sink.h>
#include <babeltrace/plugin/notification/notification.h>
+#include <babeltrace/plugin/notification/iterator.h>
#include <babeltrace/plugin/notification/event.h>
#include <stdio.h>
#include <stdbool.h>
return text;
}
-static void destroy_text(struct bt_component *component)
+static
+void destroy_text(struct bt_component *component)
{
void *data = bt_component_get_private_data(component);
}
static
-enum bt_component_status handle_notification(struct bt_component *component,
+enum bt_component_status handle_notification(struct text_component *text,
struct bt_notification *notification)
{
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- struct text_component *text = bt_component_get_private_data(component);
if (!text) {
ret = BT_COMPONENT_STATUS_ERROR;
goto end;
}
ret = text_print_event(text, event);
+ bt_put(event);
if (ret != BT_COMPONENT_STATUS_OK) {
goto end;
}
return ret;
}
+static
+enum bt_component_status run(struct bt_component *component)
+{
+ enum bt_component_status ret;
+ struct bt_notification *notification = NULL;
+ struct bt_notification_iterator *it;
+ struct text_component *text = bt_component_get_private_data(component);
+
+ ret = bt_component_sink_get_input_iterator(component, 0, &it);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+
+ if (!text->processed_first_event) {
+ ret = bt_notification_iterator_next(it);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ } else {
+ text->processed_first_event = true;
+ }
+
+ notification = bt_notification_iterator_get_notification(it);
+ if (!notification) {
+ ret = BT_COMPONENT_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = handle_notification(text, notification);
+end:
+ bt_put(it);
+ bt_put(notification);
+ return ret;
+}
+
static
enum bt_component_status text_component_init(
struct bt_component *component, struct bt_value *params)
goto error;
}
- ret = bt_component_sink_set_handle_notification_cb(component,
- handle_notification);
+ ret = bt_component_sink_set_consume_cb(component,
+ run);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}