* Object copying (except fields and values).
* Object freezing (whatever the type, as freezing only occurs in
developer mode).
-* Object cancellation.
+* Object interruption.
* Calling user methods and logging the result.
* Setting object properties (except fields and values).
|
#include <babeltrace2/plugin/plugin-dev.h>
/* Cancel private definitions */
-#undef __BT_FUNC_STATUS_OVERFLOW
-#undef __BT_FUNC_STATUS_INVALID_PARAMS
+#undef __BT_FUNC_STATUS_AGAIN
+#undef __BT_FUNC_STATUS_END
+#undef __BT_FUNC_STATUS_END
+#undef __BT_FUNC_STATUS_ERROR
+#undef __BT_FUNC_STATUS_ERROR
+#undef __BT_FUNC_STATUS_INTERRUPTED
#undef __BT_FUNC_STATUS_INVALID_OBJECT
#undef __BT_FUNC_STATUS_MEMORY_ERROR
-#undef __BT_FUNC_STATUS_ERROR
-#undef __BT_FUNC_STATUS_OK
-#undef __BT_FUNC_STATUS_END
#undef __BT_FUNC_STATUS_NOT_FOUND
-#undef __BT_FUNC_STATUS_AGAIN
-#undef __BT_FUNC_STATUS_CANCELED
+#undef __BT_FUNC_STATUS_OK
#undef __BT_IN_BABELTRACE_H
#undef __BT_UPCAST
#undef __BT_UPCAST_CONST
# define __BT_FUNC_STATUS_NOT_FOUND 2
#endif
+/* Object is interrupted */
+#ifndef __BT_FUNC_STATUS_INTERRUPTED
+# define __BT_FUNC_STATUS_INTERRUPTED 4
+#endif
+
/* Try operation again later */
#ifndef __BT_FUNC_STATUS_AGAIN
# define __BT_FUNC_STATUS_AGAIN 11
#endif
-
-/* Object is canceled */
-#ifndef __BT_FUNC_STATUS_CANCELED
-# define __BT_FUNC_STATUS_CANCELED 125
-#endif
BT_COMPONENT_CLASS_TYPE_SINK;
}
-extern bt_bool bt_component_graph_is_canceled(
- const bt_component *component);
-
extern void bt_component_get_ref(const bt_component *component);
extern void bt_component_put_ref(const bt_component *component);
extern "C" {
#endif
-extern bt_bool bt_graph_is_canceled(const bt_graph *graph);
-
extern void bt_graph_get_ref(const bt_graph *graph);
extern void bt_graph_put_ref(const bt_graph *graph);
typedef enum bt_graph_connect_ports_status {
BT_GRAPH_CONNECT_PORTS_STATUS_OK = __BT_FUNC_STATUS_OK,
BT_GRAPH_CONNECT_PORTS_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
- BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
BT_GRAPH_CONNECT_PORTS_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
} bt_graph_connect_ports_status;
BT_GRAPH_RUN_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
BT_GRAPH_RUN_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
BT_GRAPH_RUN_STATUS_END = __BT_FUNC_STATUS_END,
- BT_GRAPH_RUN_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
} bt_graph_run_status;
extern bt_graph_run_status bt_graph_run(bt_graph *graph);
BT_GRAPH_CONSUME_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
BT_GRAPH_CONSUME_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
BT_GRAPH_CONSUME_STATUS_END = __BT_FUNC_STATUS_END,
- BT_GRAPH_CONSUME_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
} bt_graph_consume_status;
extern bt_graph_consume_status bt_graph_consume(bt_graph *graph);
bt_graph_listener_removed_func listener_removed, void *data,
int *listener_id);
-typedef enum bt_graph_cancel_status {
- BT_GRAPH_CANCEL_STATUS_OK = __BT_FUNC_STATUS_OK,
-} bt_graph_cancel_status;
+typedef enum bt_graph_add_interrupter_status {
+ BT_GRAPH_ADD_INTERRUPTER_STATUS_OK = __BT_FUNC_STATUS_OK,
+ BT_GRAPH_ADD_INTERRUPTER_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
+} bt_graph_add_interrupter_status;
-extern bt_graph_cancel_status bt_graph_cancel(bt_graph *graph);
+extern bt_graph_add_interrupter_status bt_graph_add_interrupter(bt_graph *graph,
+ const bt_interrupter *interrupter);
+
+extern void bt_graph_interrupt(bt_graph *graph);
#ifdef __cplusplus
}
extern "C" {
#endif
-extern
-bt_bool bt_query_executor_is_canceled(
+extern bt_bool bt_query_executor_is_interrupted(
const bt_query_executor *query_executor);
-extern void bt_query_executor_get_ref(
- const bt_query_executor *query_executor);
+extern void bt_query_executor_get_ref(const bt_query_executor *query_executor);
-extern void bt_query_executor_put_ref(
- const bt_query_executor *query_executor);
+extern void bt_query_executor_put_ref(const bt_query_executor *query_executor);
#define BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(_var) \
do { \
typedef enum bt_query_executor_query_status {
BT_QUERY_EXECUTOR_QUERY_STATUS_OK = __BT_FUNC_STATUS_OK,
BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN,
- BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR = __BT_FUNC_STATUS_ERROR,
BT_QUERY_EXECUTOR_QUERY_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT = __BT_FUNC_STATUS_INVALID_OBJECT,
const char *object, const bt_value *params,
bt_logging_level logging_level, const bt_value **result);
-typedef enum bt_query_executor_cancel_status {
- BT_QUERY_EXECUTOR_CANCEL_STATUS_OK = __BT_FUNC_STATUS_OK,
-} bt_query_executor_cancel_status;
+typedef enum bt_query_executor_add_interrupter_status {
+ BT_QUERY_EXECUTOR_ADD_INTERRUPTER_STATUS_OK = __BT_FUNC_STATUS_OK,
+ BT_QUERY_EXECUTOR_ADD_INTERRUPTER_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
+} bt_query_executor_add_interrupter_status;
-extern
-bt_query_executor_cancel_status bt_query_executor_cancel(
- bt_query_executor *query_executor);
+extern bt_query_executor_add_interrupter_status
+bt_query_executor_add_interrupter(bt_query_executor *query_executor,
+ const bt_interrupter *interrupter);
+
+extern void bt_query_executor_interrupt(bt_query_executor *query_executor);
#ifdef __cplusplus
}
const char *name, void *user_data,
bt_self_component_port_input **self_component_port);
+extern bt_bool bt_self_component_sink_is_interrupted(
+ const bt_self_component_sink *self_component);
+
#ifdef __cplusplus
}
#endif
extern "C" {
#endif
+extern bt_bool bt_self_message_iterator_is_interrupted(
+ const bt_self_message_iterator *message_iterator);
+
extern bt_self_component *
bt_self_message_iterator_borrow_component(
bt_self_message_iterator *message_iterator);
typedef enum bt_value_map_foreach_entry_const_status {
BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_OK = __BT_FUNC_STATUS_OK,
- BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
+ BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_INTERRUPTED = __BT_FUNC_STATUS_INTERRUPTED,
} bt_value_map_foreach_entry_const_status;
extern bt_value_map_foreach_entry_const_status bt_value_map_foreach_entry_const(
typedef enum bt_value_map_foreach_entry_status {
BT_VALUE_MAP_FOREACH_ENTRY_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR,
BT_VALUE_MAP_FOREACH_ENTRY_STATUS_OK = __BT_FUNC_STATUS_OK,
- BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED,
+ BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED = __BT_FUNC_STATUS_INTERRUPTED,
} bt_value_map_foreach_entry_status;
extern bt_value_map_foreach_entry_status bt_value_map_foreach_entry(
pass
-class Canceled(Exception):
- pass
-
-
class _ListenerHandle:
def __init__(self, listener_id, obj):
self._listener_id = listener_id
raise bt2.CreationError('cannot create message iterator object')
return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
+
+ @property
+ def _is_interrupted(self):
+ return bool(native_bt.self_component_sink_is_interrupted(self._bt_ptr))
# THE SOFTWARE.
from bt2 import native_bt, object, utils
+import bt2.interrupter
import bt2.connection
import bt2.component
import functools
status = native_bt.graph_run(self._ptr)
try:
- utils._handle_func_status(
- status, 'graph object stopped running because of an unexpected error'
- )
+ utils._handle_func_status(status, 'graph object stopped running')
except bt2.Stop:
# done
return
except Exception:
raise
- def cancel(self):
- status = native_bt.graph_cancel(self._ptr)
- utils._handle_func_status(status, 'cannot cancel graph object')
+ def add_interrupter(self, interrupter):
+ utils._check_type(interrupter, bt2.interrupter.Interrupter)
+ native_bt.graph_add_interrupter(self._ptr, interrupter._ptr)
- @property
- def is_canceled(self):
- is_canceled = native_bt.graph_is_canceled(self._ptr)
- assert is_canceled >= 0
- return is_canceled > 0
+ def interrupt(self):
+ native_bt.graph_interrupt(self._ptr)
def create_output_port_message_iterator(self, output_port):
utils._check_type(output_port, bt2.port._OutputPort)
def addr(self):
return int(self._bt_ptr)
+ @property
+ def _is_interrupted(self):
+ return bool(native_bt.self_message_iterator_is_interrupted(self._bt_ptr))
+
def _finalize(self):
pass
static PyObject *py_mod_bt2_exc_memory_error = NULL;
static PyObject *py_mod_bt2_exc_try_again_type = NULL;
static PyObject *py_mod_bt2_exc_stop_type = NULL;
-static PyObject *py_mod_bt2_exc_msg_iter_canceled_type = NULL;
static PyObject *py_mod_bt2_exc_invalid_object_type = NULL;
static PyObject *py_mod_bt2_exc_invalid_params_type = NULL;
Py_XDECREF(py_mod_bt2_exc_error_type);
Py_XDECREF(py_mod_bt2_exc_try_again_type);
Py_XDECREF(py_mod_bt2_exc_stop_type);
- Py_XDECREF(py_mod_bt2_exc_msg_iter_canceled_type);
Py_XDECREF(py_mod_bt2_exc_invalid_object_type);
Py_XDECREF(py_mod_bt2_exc_invalid_params_type);
}
# THE SOFTWARE.
from bt2 import native_bt, object, utils
+import bt2.interrupter
import bt2.component
import bt2.logging
import bt2
super().__init__(ptr)
- def cancel(self):
- status = native_bt.query_executor_cancel(self._ptr)
- utils._handle_func_status(status, 'cannot cancel query executor object')
+ def add_interrupter(self, interrupter):
+ utils._check_type(interrupter, bt2.interrupter.Interrupter)
+ native_bt.query_executor_add_interrupter(self._ptr, interrupter._ptr)
+
+ def interrupt(self):
+ native_bt.query_executor_interrupt(self._ptr)
@property
- def is_canceled(self):
- is_canceled = native_bt.query_executor_is_canceled(self._ptr)
- assert is_canceled >= 0
- return is_canceled > 0
+ def is_interrupted(self):
+ is_interrupted = native_bt.query_executor_is_interrupted(self._ptr)
+ return bool(is_interrupted)
def query(
self,
params=None,
logging_level=bt2.logging.LoggingLevel.NONE,
):
- if self.is_canceled:
- raise bt2.Canceled
-
if not isinstance(component_class, bt2.component._GenericComponentClass):
err = False
raise bt2.TryAgain
else:
raise bt2.TryAgain(msg)
- elif status == native_bt.__BT_FUNC_STATUS_CANCELED:
- if msg is None:
- raise bt2.Canceled
- else:
- raise bt2.Canceled(msg)
elif status == native_bt.__BT_FUNC_STATUS_OVERFLOW:
if msg is None:
raise bt2.OverflowError
NULL,
};
-/* Application's processing graph (weak) */
-static bt_graph *the_graph;
-static bt_query_executor *the_query_executor;
-static bool canceled = false;
+/* Application's interrupter (owned by this) */
+static bt_interrupter *the_interrupter;
+static volatile bool interrupted = false;
#ifdef __MINGW32__
static
BOOL WINAPI signal_handler(DWORD signal) {
- if (the_graph) {
- bt_graph_cancel(the_graph);
+ if (the_interrupter) {
+ bt_interrupter_set(the_interrupter);
}
- canceled = true;
-
+ interrupted = true;
return TRUE;
}
return;
}
- if (the_graph) {
- bt_graph_cancel(the_graph);
- }
-
- if (the_query_executor) {
- bt_query_executor_cancel(the_query_executor);
+ if (the_interrupter) {
+ bt_interrupter_set(the_interrupter);
}
- canceled = true;
+ interrupted = true;
}
static
#endif /* __MINGW32__ */
-static
-int create_the_query_executor(void)
-{
- int ret = 0;
-
- the_query_executor = bt_query_executor_create();
- if (!the_query_executor) {
- BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor.");
- ret = -1;
- }
-
- return ret;
-}
-
-static
-void destroy_the_query_executor(void)
-{
- BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(the_query_executor);
-}
-
static
int query(struct bt_config *cfg, const bt_component_class *comp_cls,
const char *obj, const bt_value *params,
{
const bt_value *result = NULL;
bt_query_executor_query_status query_status;
+ bt_query_executor *query_exec;
*fail_reason = "unknown error";
int ret = 0;
BT_ASSERT(fail_reason);
BT_ASSERT(user_result);
- ret = create_the_query_executor();
- if (ret) {
- /* create_the_query_executor() logs errors */
- goto end;
+ query_exec = bt_query_executor_create();
+ if (!query_exec) {
+ BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor.");
+ goto error;
}
- if (canceled) {
+ bt_query_executor_add_interrupter(query_exec, the_interrupter);
+
+ if (interrupted) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Canceled by user before executing the query: "
+ "Interrupted by user before executing the query: "
"comp-cls-addr=%p, comp-cls-name=\"%s\", "
"query-obj=\"%s\"", comp_cls,
bt_component_class_get_name(comp_cls), obj);
- *fail_reason = "canceled by user";
+ *fail_reason = "interrupted by user";
goto error;
}
while (true) {
query_status = bt_query_executor_query(
- the_query_executor, comp_cls, obj, params,
+ query_exec, comp_cls, obj, params,
cfg->log_level, &result);
switch (query_status) {
case BT_QUERY_EXECUTOR_QUERY_STATUS_OK:
{
const uint64_t sleep_time_us = 100000;
+ if (bt_interrupter_is_set(the_interrupter)) {
+ *fail_reason = "interrupted by user";
+ goto error;
+ }
+
/* Wait 100 ms and retry */
BT_LOGD("Got BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN: sleeping: "
"time-us=%" PRIu64, sleep_time_us);
if (usleep(sleep_time_us)) {
- if (bt_query_executor_is_canceled(the_query_executor)) {
+ if (bt_interrupter_is_set(the_interrupter)) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Query was canceled by user: "
+ "Query was interrupted by user: "
"comp-cls-addr=%p, comp-cls-name=\"%s\", "
"query-obj=\"%s\"", comp_cls,
bt_component_class_get_name(comp_cls),
obj);
- *fail_reason = "canceled by user";
+ *fail_reason = "interrupted by user";
goto error;
}
}
continue;
}
- case BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED:
- *fail_reason = "canceled by user";
- goto error;
case BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR:
+ if (bt_interrupter_is_set(the_interrupter)) {
+ *fail_reason = "interrupted by user";
+ goto error;
+ }
+
goto error;
case BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT:
*fail_reason = "invalid or unknown query object";
ret = -1;
end:
- destroy_the_query_executor();
+ bt_query_executor_put_ref(query_exec);
bt_value_put_ref(result);
return ret;
}
switch (connect_ports_status) {
case BT_GRAPH_CONNECT_PORTS_STATUS_OK:
break;
- case BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED:
- BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user.");
- break;
default:
BT_CLI_LOGE_APPEND_CAUSE(
"Cannot create connection: graph refuses to connect ports: "
}
BT_GRAPH_PUT_REF_AND_RESET(ctx->graph);
- the_graph = NULL;
ctx->cfg = NULL;
}
goto error;
}
- the_graph = ctx->graph;
+ bt_graph_add_interrupter(ctx->graph, the_interrupter);
add_listener_status = bt_graph_add_source_component_output_port_added_listener(
ctx->graph, graph_source_output_port_added_listener, NULL, ctx,
NULL);
goto error;
}
- if (canceled) {
+ if (interrupted) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Canceled by user before creating components.");
+ "Interrupted by user before creating components.");
goto error;
}
goto error;
}
- if (canceled) {
+ if (interrupted) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Canceled by user before connecting components.");
+ "Interrupted by user before connecting components.");
goto error;
}
goto error;
}
- if (canceled) {
+ if (interrupted) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Canceled by user before running the graph.");
+ "Interrupted by user before running the graph.");
goto error;
}
switch (run_status) {
case BT_GRAPH_RUN_STATUS_OK:
break;
- case BT_GRAPH_RUN_STATUS_CANCELED:
- BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user.");
- goto error;
case BT_GRAPH_RUN_STATUS_AGAIN:
- if (bt_graph_is_canceled(ctx.graph)) {
+ if (bt_interrupter_is_set(the_interrupter)) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Graph was canceled by user.");
+ "Graph was interrupted by user.");
goto error;
}
cfg->cmd_data.run.retry_duration_us);
if (usleep(cfg->cmd_data.run.retry_duration_us)) {
- if (bt_graph_is_canceled(ctx.graph)) {
+ if (bt_interrupter_is_set(the_interrupter)) {
BT_CLI_LOGW_APPEND_CAUSE(
- "Graph was canceled by user.");
+ "Graph was interrupted by user.");
goto error;
}
}
case BT_GRAPH_RUN_STATUS_END:
goto end;
default:
+ if (bt_interrupter_is_set(the_interrupter)) {
+ BT_CLI_LOGW_APPEND_CAUSE(
+ "Graph was interrupted by user and failed: "
+ "status=%s",
+ bt_common_func_status_string(run_status));
+ goto error;
+ }
+
BT_CLI_LOGE_APPEND_CAUSE(
"Graph failed to complete successfully");
goto error;
}
}
+ BT_ASSERT(!the_interrupter);
+ the_interrupter = bt_interrupter_create();
+ if (!the_interrupter) {
+ BT_CLI_LOGE_APPEND_CAUSE("Failed to create an interrupter object.");
+ retcode = 1;
+ goto end;
+ }
+
BT_LOGI("Executing command: cmd=%d, command-name=\"%s\"",
cfg->command, cfg->command_name);
end:
BT_OBJECT_PUT_REF_AND_RESET(cfg);
fini_loaded_plugins();
+ bt_interrupter_put_ref(the_interrupter);
if (retcode != 0) {
print_error_causes();
return "NOT_FOUND";
case __BT_FUNC_STATUS_AGAIN:
return "AGAIN";
- case __BT_FUNC_STATUS_CANCELED:
- return "CANCELED";
+ case __BT_FUNC_STATUS_INTERRUPTED:
+ return "INTERRUPTED";
default:
return "(unknown)";
}
* Aliases without a `__` prefix for internal code: this is just easier
* to read.
*/
-#define BT_FUNC_STATUS_OVERFLOW __BT_FUNC_STATUS_OVERFLOW
-#define BT_FUNC_STATUS_INVALID_PARAMS __BT_FUNC_STATUS_INVALID_PARAMS
+#define BT_FUNC_STATUS_AGAIN __BT_FUNC_STATUS_AGAIN
+#define BT_FUNC_STATUS_END __BT_FUNC_STATUS_END
+#define BT_FUNC_STATUS_ERROR __BT_FUNC_STATUS_ERROR
+#define BT_FUNC_STATUS_INTERRUPTED __BT_FUNC_STATUS_INTERRUPTED
#define BT_FUNC_STATUS_INVALID_OBJECT __BT_FUNC_STATUS_INVALID_OBJECT
+#define BT_FUNC_STATUS_INVALID_PARAMS __BT_FUNC_STATUS_INVALID_PARAMS
#define BT_FUNC_STATUS_MEMORY_ERROR __BT_FUNC_STATUS_MEMORY_ERROR
-#define BT_FUNC_STATUS_ERROR __BT_FUNC_STATUS_ERROR
-#define BT_FUNC_STATUS_OK __BT_FUNC_STATUS_OK
-#define BT_FUNC_STATUS_END __BT_FUNC_STATUS_END
#define BT_FUNC_STATUS_NOT_FOUND __BT_FUNC_STATUS_NOT_FOUND
-#define BT_FUNC_STATUS_AGAIN __BT_FUNC_STATUS_AGAIN
-#define BT_FUNC_STATUS_CANCELED __BT_FUNC_STATUS_CANCELED
+#define BT_FUNC_STATUS_OK __BT_FUNC_STATUS_OK
+#define BT_FUNC_STATUS_OVERFLOW __BT_FUNC_STATUS_OVERFLOW
#endif /* BABELTRACE_FUNC_STATUS_INTERNAL_H */
#include "common/assert.h"
#include "lib/assert-pre.h"
+#include "lib/assert-post.h"
#include "compat/compiler.h"
#include <babeltrace2/value.h>
#include <babeltrace2/graph/self-component-sink.h>
#include "component-sink.h"
#include "component.h"
+#include "graph.h"
#include "lib/func-status.h"
BT_HIDDEN
return status;
}
+bt_bool bt_self_component_sink_is_interrupted(
+ const struct bt_self_component_sink *self_comp)
+{
+ struct bt_component *comp = (void *) self_comp;
+
+ BT_ASSERT_PRE_NON_NULL(comp, "Component");
+ return (bt_bool) bt_graph_is_interrupted(
+ bt_component_borrow_graph(comp));
+}
+
void bt_component_sink_get_ref(
const struct bt_component_sink *component_sink)
{
BT_ASSERT_PRE_NON_NULL(name, "Name");
BT_ASSERT_PRE(strlen(name) > 0, "Name is empty");
graph = bt_component_borrow_graph(component);
- BT_ASSERT_PRE(graph && !bt_graph_is_canceled(graph),
- "Component's graph is canceled: %![comp-]+c, %![graph-]+g",
- component, graph);
BT_ASSERT_PRE(
graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
"Component's graph is already configured: "
graph ? &graph->base : NULL);
}
-bt_bool bt_component_graph_is_canceled(const struct bt_component *component)
-{
- return bt_graph_is_canceled(
- (void *) bt_object_borrow_parent(&component->base));
-}
-
static
struct bt_port *borrow_port_by_name(GPtrArray *ports,
const char *name)
/*
* Make sure that each message iterator which was created for
* this connection is finalized before we destroy it. Once a
- * message iterator is finalized, all its method return NULL or
- * the BT_MESSAGE_ITERATOR_STATUS_CANCELED status.
+ * message iterator is finalized, you cannot use it.
*
* Because connections are destroyed before components within a
* graph, this ensures that message iterators are always
#include "component-sink.h"
#include "connection.h"
#include "graph.h"
+#include "interrupter.h"
#include "message/event.h"
#include "message/packet.h"
*/
BT_LIB_LOGI("Destroying graph: %!+g", graph);
obj->ref_count++;
-
- /*
- * Cancel the graph to disallow some operations, like creating
- * message iterators and adding ports to components.
- */
- (void) bt_graph_cancel((void *) graph);
+ graph->config_state = BT_GRAPH_CONFIGURATION_STATE_DESTROYING;
/* Call all remove listeners */
CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
graph->components = NULL;
}
+ if (graph->interrupters) {
+ BT_LOGD_STR("Putting interrupters.");
+ g_ptr_array_free(graph->interrupters, TRUE);
+ graph->interrupters = NULL;
+ }
+
+ BT_OBJECT_PUT_REF_AND_RESET(graph->default_interrupter);
+
if (graph->sinks_to_consume) {
g_queue_free(graph->sinks_to_consume);
graph->sinks_to_consume = NULL;
goto error;
}
+ graph->interrupters = g_ptr_array_new_with_free_func(
+ (GDestroyNotify) bt_object_put_no_null_check);
+ if (!graph->interrupters) {
+ BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray.");
+ goto error;
+ }
+
+ graph->default_interrupter = bt_interrupter_create();
+ if (!graph->default_interrupter) {
+ BT_LIB_LOGE_APPEND_CAUSE(
+ "Failed to create one interrupter object.");
+ goto error;
+ }
+
+ bt_graph_add_interrupter(graph, graph->default_interrupter);
ret = bt_object_pool_initialize(&graph->event_msg_pool,
(bt_object_pool_new_object_func) bt_message_event_new,
(bt_object_pool_destroy_object_func) destroy_message_event,
BT_ASSERT_PRE_NON_NULL(graph, "Graph");
BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port");
BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port");
- BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
BT_ASSERT_PRE(
graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
"Graph is not in the \"configuring\" state: %!+g", graph);
enum bt_graph_consume_status status;
BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph");
- BT_ASSERT_PRE_DEV(!graph->canceled, "Graph is canceled: %!+g", graph);
BT_ASSERT_PRE_DEV(graph->can_consume,
"Cannot consume graph in its current state: %!+g", graph);
BT_ASSERT_PRE_DEV(graph->config_state !=
enum bt_graph_run_status status;
BT_ASSERT_PRE_NON_NULL(graph, "Graph");
- BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
BT_ASSERT_PRE(graph->can_consume,
"Cannot consume graph in its current state: %!+g", graph);
BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
do {
/*
- * Check if the graph is canceled at each iteration. If
- * the graph was canceled by another thread or by a
- * signal handler, this is not a warning nor an error,
- * it was intentional: log with a DEBUG level only.
+ * Check if the graph is interrupted at each iteration.
+ * If the graph was interrupted by another thread or by
+ * a signal handler, this is NOT a warning nor an error;
+ * it was intentional: log with an INFO level only.
*/
- if (G_UNLIKELY(graph->canceled)) {
- BT_LIB_LOGI("Stopping the graph: graph is canceled: "
- "%!+g", graph);
- status = BT_FUNC_STATUS_CANCELED;
+ if (G_UNLIKELY(bt_graph_is_interrupted(graph))) {
+ BT_LIB_LOGI("Stopping the graph: "
+ "graph was interrupted: %!+g", graph);
+ status = BT_FUNC_STATUS_AGAIN;
goto end;
}
return status;
}
-enum bt_graph_cancel_status bt_graph_cancel(struct bt_graph *graph)
-{
- BT_ASSERT_PRE_NON_NULL(graph, "Graph");
- graph->canceled = true;
- BT_LIB_LOGI("Canceled graph: %!+i", graph);
- return BT_FUNC_STATUS_OK;
-}
-
-bt_bool bt_graph_is_canceled(const struct bt_graph *graph)
-{
- BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph");
- return graph->canceled ? BT_TRUE : BT_FALSE;
-}
-
BT_HIDDEN
void bt_graph_remove_connection(struct bt_graph *graph,
struct bt_connection *connection)
BT_ASSERT(comp_cls);
BT_ASSERT_PRE_NON_NULL(graph, "Graph");
BT_ASSERT_PRE_NON_NULL(name, "Name");
- BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
BT_ASSERT_PRE(
graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
"Graph is not in the \"configuring\" state: %!+g", graph);
g_ptr_array_add(graph->messages, msg);
}
+BT_HIDDEN
+bool bt_graph_is_interrupted(const struct bt_graph *graph)
+{
+ BT_ASSERT(graph);
+ return bt_interrupter_array_any_is_set(graph->interrupters);
+}
+
+enum bt_graph_add_interrupter_status bt_graph_add_interrupter(
+ struct bt_graph *graph, const struct bt_interrupter *intr)
+{
+ BT_ASSERT_PRE_NON_NULL(graph, "Graph");
+ BT_ASSERT_PRE_NON_NULL(intr, "Interrupter");
+ g_ptr_array_add(graph->interrupters, (void *) intr);
+ bt_object_get_no_null_check(intr);
+ BT_LIB_LOGD("Added interrupter to graph: %![graph-]+g, %![intr-]+z",
+ graph, intr);
+ return BT_FUNC_STATUS_OK;
+}
+
+void bt_graph_interrupt(struct bt_graph *graph)
+{
+ BT_ASSERT_PRE_NON_NULL(graph, "Graph");
+ bt_interrupter_set(graph->default_interrupter);
+ BT_LIB_LOGI("Interrupted graph: %!+g", graph);
+}
+
void bt_graph_get_ref(const struct bt_graph *graph)
{
bt_object_get_ref(graph);
BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED,
BT_GRAPH_CONFIGURATION_STATE_CONFIGURED,
BT_GRAPH_CONFIGURATION_STATE_FAULTY,
+ BT_GRAPH_CONFIGURATION_STATE_DESTROYING,
};
struct bt_graph {
/* Queue of pointers (weak references) to sink bt_components. */
GQueue *sinks_to_consume;
- bool canceled;
+ /*
+ * Array of `struct bt_interrupter *`, each one owned by this.
+ * If any interrupter is set, then this graph is deemed
+ * interrupted.
+ */
+ GPtrArray *interrupters;
+
+ /*
+ * Default interrupter to support bt_graph_interrupt(); owned
+ * by this.
+ */
+ struct bt_interrupter *default_interrupter;
+
bool in_remove_listener;
bool has_sink;
void bt_graph_add_message(struct bt_graph *graph,
struct bt_message *msg);
+BT_HIDDEN
+bool bt_graph_is_interrupted(const struct bt_graph *graph);
+
static inline
const char *bt_graph_configuration_state_string(
enum bt_graph_configuration_state state)
bool is_set;
};
+static inline
+bool bt_interrupter_array_any_is_set(const GPtrArray *interrupters)
+{
+ bool is_set = false;
+ uint64_t i;
+
+ BT_ASSERT(interrupters);
+
+ for (i = 0; i < interrupters->len; i++) {
+ const struct bt_interrupter *intr = interrupters->pdata[i];
+
+ if (intr->is_set) {
+ is_set = true;
+ goto end;
+ }
+ }
+
+end:
+ return is_set;
+}
+
#endif /* BABELTRACE_GRAPH_INTERRUPTER_INTERNAL_H */
"Input port is not connected: %![port-]+p", port);
BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p",
port);
- BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
- "Input port's component's graph is canceled: "
- "%![port-]+p, %![comp-]+c", port, comp);
BT_ASSERT(port->connection);
upstream_port = port->connection->upstream_port;
BT_ASSERT(upstream_port);
upstream_comp = bt_port_borrow_component_inline(upstream_port);
BT_ASSERT(upstream_comp);
BT_ASSERT_PRE(
- bt_component_borrow_graph(upstream_comp)->config_state !=
- BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
+ bt_component_borrow_graph(upstream_comp)->config_state ==
+ BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED ||
+ bt_component_borrow_graph(upstream_comp)->config_state ==
+ BT_GRAPH_CONFIGURATION_STATE_CONFIGURED,
"Graph is not configured: %!+g",
bt_component_borrow_graph(upstream_comp));
upstream_comp_cls = upstream_comp->class;
graph_status = bt_graph_consume_sink_no_check(iterator->graph,
iterator->colander);
switch (graph_status) {
- case BT_FUNC_STATUS_CANCELED:
case BT_FUNC_STATUS_AGAIN:
case BT_FUNC_STATUS_END:
case BT_FUNC_STATUS_MEMORY_ERROR:
iterator));
}
+bt_bool bt_self_message_iterator_is_interrupted(
+ const struct bt_self_message_iterator *self_msg_iter)
+{
+ const struct bt_self_component_port_input_message_iterator *iterator =
+ (const void *) self_msg_iter;
+
+ BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
+ return (bt_bool) bt_graph_is_interrupted(iterator->graph);
+}
+
void bt_port_output_message_iterator_get_ref(
const struct bt_port_output_message_iterator *iterator)
{
#include "component-class.h"
#include "query-executor.h"
+#include "interrupter.h"
#include "lib/func-status.h"
static
container_of(obj, struct bt_query_executor, base);
BT_LOGD("Destroying query executor: addr=%p", query_exec);
+
+ if (query_exec->interrupters) {
+ BT_LOGD_STR("Putting interrupters.");
+ g_ptr_array_free(query_exec->interrupters, TRUE);
+ query_exec->interrupters = NULL;
+ }
+
+ BT_OBJECT_PUT_REF_AND_RESET(query_exec->default_interrupter);
+
g_free(query_exec);
}
goto end;
}
+ query_exec->interrupters = g_ptr_array_new_with_free_func(
+ (GDestroyNotify) bt_object_put_no_null_check);
+ if (!query_exec->interrupters) {
+ BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray.");
+ BT_OBJECT_PUT_REF_AND_RESET(query_exec);
+ goto end;
+ }
+
+ query_exec->default_interrupter = bt_interrupter_create();
+ if (!query_exec->default_interrupter) {
+ BT_LIB_LOGE_APPEND_CAUSE(
+ "Failed to create one interrupter object.");
+ BT_OBJECT_PUT_REF_AND_RESET(query_exec);
+ goto end;
+ }
+
+ bt_query_executor_add_interrupter(query_exec,
+ query_exec->default_interrupter);
bt_object_init_shared(&query_exec->base,
bt_query_executor_destroy);
BT_LOGD("Created query executor: addr=%p", query_exec);
BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
BT_ASSERT_PRE_NON_NULL(object, "Object");
BT_ASSERT_PRE_NON_NULL(user_result, "Result (output)");
- BT_ASSERT_PRE(!query_exec->canceled, "Query executor is canceled.");
if (!params) {
params = bt_value_null;
BT_ASSERT_POST(query_status != BT_FUNC_STATUS_OK || *user_result,
"User method returned `BT_FUNC_STATUS_OK` without a result.");
status = (int) query_status;
- if (query_exec->canceled) {
- BT_OBJECT_PUT_REF_AND_RESET(*user_result);
- status = BT_FUNC_STATUS_CANCELED;
- goto end;
- }
end:
return status;
}
-enum bt_query_executor_cancel_status bt_query_executor_cancel(
- struct bt_query_executor *query_exec)
+enum bt_query_executor_add_interrupter_status bt_query_executor_add_interrupter(
+ struct bt_query_executor *query_exec,
+ const struct bt_interrupter *intr)
{
BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
- query_exec->canceled = BT_TRUE;
- BT_LOGI("Canceled query executor: addr=%p", query_exec);
+ BT_ASSERT_PRE_NON_NULL(intr, "Interrupter");
+ g_ptr_array_add(query_exec->interrupters, (void *) intr);
+ bt_object_get_no_null_check(intr);
+ BT_LIB_LOGD("Added interrupter to query executor: "
+ "query-exec-addr=%p, %![intr-]+z",
+ query_exec, intr);
return BT_FUNC_STATUS_OK;
}
-bt_bool bt_query_executor_is_canceled(const struct bt_query_executor *query_exec)
+bt_bool bt_query_executor_is_interrupted(const struct bt_query_executor *query_exec)
{
- BT_ASSERT_PRE_DEV_NON_NULL(query_exec, "Query executor");
- return query_exec->canceled;
+ BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
+ return (bt_bool) bt_interrupter_array_any_is_set(
+ query_exec->interrupters);
+}
+
+void bt_query_executor_interrupt(struct bt_query_executor *query_exec)
+{
+ BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
+ bt_interrupter_set(query_exec->default_interrupter);
+ BT_LIB_LOGI("Interrupted query executor: query-exec-addr=%p",
+ query_exec);
}
void bt_query_executor_get_ref(const struct bt_query_executor *query_executor)
* SOFTWARE.
*/
+#include <glib.h>
+
#include <babeltrace2/types.h>
-#include "lib/object.h"
#include <babeltrace2/graph/query-executor.h>
#include <babeltrace2/graph/component-class.h>
+#include "lib/object.h"
+
struct bt_query_executor {
struct bt_object base;
- bool canceled;
+
+ /*
+ * Array of `struct bt_interrupter *`, each one owned by this.
+ * If any interrupter is set, then this query executor is deemed
+ * interrupted.
+ */
+ GPtrArray *interrupters;
+
+ /*
+ * Default interrupter to support bt_query_executor_interrupt();
+ * owned by this.
+ */
+ struct bt_interrupter *default_interrupter;
};
#endif /* BABELTRACE_GRAPH_QUERY_EXECUTOR_INTERNAL_H */
{
char tmp_prefix[TMP_PREFIX_LEN];
- BUF_APPEND(", %sis-canceled=%d, %scan-consume=%d, "
- "%sconfig-state=%s",
- PRFIELD(graph->canceled),
+ BUF_APPEND(", %scan-consume=%d, %sconfig-state=%s",
PRFIELD(graph->can_consume),
PRFIELD(bt_graph_configuration_state_string(graph->config_state)));
const char *key_str = g_quark_to_string(GPOINTER_TO_UINT(key));
if (!func(key_str, element_obj, data)) {
- BT_LOGT("User canceled the loop: key=\"%s\", "
+ BT_LOGT("User interrupted the loop: key=\"%s\", "
"value-addr=%p, data=%p",
key_str, element_obj, data);
- ret = BT_FUNC_STATUS_CANCELED;
+ ret = BT_FUNC_STATUS_INTERRUPTED;
break;
}
}
} while (0);
BT_HIDDEN
-bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
+bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
{
- const bt_component *component;
bool ret;
- if (!lttng_live) {
+ if (!msg_iter) {
ret = false;
goto end;
}
- component = bt_self_component_as_component(lttng_live->self_comp);
- ret = bt_component_graph_is_canceled(component);
+ ret = bt_self_message_iterator_is_interrupted(
+ msg_iter->self_msg_iter);
end:
return ret;
static
void lttng_live_destroy_session(struct lttng_live_session *session)
{
- struct lttng_live_component *live_comp;
bt_logging_level log_level;
bt_self_component *self_comp;
BT_COMP_LOGD("Destroy lttng live session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- live_comp = session->lttng_live_msg_iter->lttng_live_comp;
if (session->lttng_live_msg_iter &&
- !lttng_live_graph_is_canceled(live_comp)) {
+ !lttng_live_graph_is_canceled(
+ session->lttng_live_msg_iter)) {
/* Old relayd cannot detach sessions. */
BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64,
session->id);
ret = lttng_live_attach_session(session);
if (ret) {
if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
- lttng_live_msg_iter->lttng_live_comp)) {
+ lttng_live_msg_iter)) {
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto error;
}
- if (lttng_live_graph_is_canceled(lttng_live)) {
- ret = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
- goto error;
- }
-
add_port_status = bt_self_component_source_add_output_port(
self_comp_src, "out", NULL, NULL);
switch (add_port_status) {
struct lttng_live_session *session, uint64_t trace_id);
void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter);
-bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live);
+bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
#endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */
{
struct lttng_live_session *session = trace->session;
struct lttng_live_metadata *metadata = trace->metadata;
- struct lttng_live_component *lttng_live =
- session->lttng_live_msg_iter->lttng_live_comp;
ssize_t ret = 0;
size_t size, len_read = 0;
char *metadata_buf = NULL;
metadata->trace = NULL;
}
if (errno == EINTR) {
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(
+ session->lttng_live_msg_iter)) {
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
goto end;
}
}
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (!viewer_connection->in_query &&
- lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+ lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
break;
} else {
continue;
ret = bt_socket_send_nosigpipe(sock, buf, len);
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (!viewer_connection->in_query &&
- lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+ lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
break;
} else {
continue;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
return retstatus;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
return retstatus;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
} else {
retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
return status;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
sink.input_ports['in'], src.output_ports['out']
)
- def test_cancel(self):
- self.assertFalse(self._graph.is_canceled)
- self._graph.cancel()
- self.assertTrue(self._graph.is_canceled)
-
- # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
- # during execution.
- def test_cancel_while_running(self):
+ def test_add_interrupter(self):
+ class MyIter(bt2._UserMessageIterator):
+ def __next__(self):
+ raise TypeError
+
+ class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ next(self._msg_iter)
+
+ def _graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ # add two interrupters, set one of them
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ self._graph.add_interrupter(interrupter1)
+ src = self._graph.add_component(MySource, 'src')
+ sink = self._graph.add_component(MySink, 'sink')
+ self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+ self._graph.add_interrupter(interrupter2)
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ interrupter2.set()
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
+
+ interrupter2.reset()
+
+ with self.assertRaises(bt2._Error):
+ self._graph.run()
+
+ # Test that Graph.run() raises bt2.Interrupted if the graph gets
+ # interrupted during execution.
+ def test_interrupt_while_running(self):
class MyIter(_MyIter):
def __next__(self):
return self._create_stream_beginning_message(self._stream)
self._add_input_port('in')
def _consume(self):
- # Pretend that somebody asynchronously cancelled the graph.
+ # Pretend that somebody asynchronously interrupted the graph.
nonlocal graph
- graph.cancel()
-
+ graph.interrupt()
return next(self._msg_iter)
def _graph_is_configured(self):
self._input_ports['in']
)
- graph = bt2.Graph()
- up = graph.add_component(MySource, 'down')
- down = graph.add_component(MySink, 'up')
- graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
- with self.assertRaises(bt2.Canceled):
- graph.run()
+ graph = self._graph
+ up = self._graph.add_component(MySource, 'down')
+ down = self._graph.add_component(MySink, 'up')
+ self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+ with self.assertRaises(bt2.TryAgain):
+ self._graph.run()
def test_run(self):
class MyIter(_MyIter):
with self.assertRaises(bt2.TryAgain):
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
- def test_cancel_no_query(self):
+ def test_query_add_interrupter(self):
+ class MySink(bt2._UserSinkComponent):
+ def _consume(self):
+ pass
+
+ def _graph_is_configured(self):
+ pass
+
+ @classmethod
+ def _query(cls, query_exec, obj, params, log_level):
+ nonlocal interrupter2
+ test_self.assertFalse(query_exec.is_interrupted)
+ interrupter2.set()
+ test_self.assertTrue(query_exec.is_interrupted)
+ interrupter2.reset()
+ test_self.assertFalse(query_exec.is_interrupted)
+
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ test_self = self
query_exec = bt2.QueryExecutor()
- self.assertFalse(query_exec.is_canceled)
- query_exec.cancel()
- self.assertTrue(query_exec.is_canceled)
+ query_exec.add_interrupter(interrupter1)
+ query_exec.add_interrupter(interrupter2)
+ query_exec.query(MySink, 'obj', [17, 23])
- def test_query_canceled(self):
+ def test_query_interrupt(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):
pass
@classmethod
def _query(cls, query_exec, obj, params, log_level):
- raise bt2.TryAgain
+ test_self.assertFalse(query_exec.is_interrupted)
+ query_exec.interrupt()
+ test_self.assertTrue(query_exec.is_interrupted)
+ test_self = self
query_exec = bt2.QueryExecutor()
- query_exec.cancel()
-
- with self.assertRaises(bt2.Canceled):
- res = query_exec.query(MySink, 'obj', [17, 23])
+ query_exec.query(MySink, 'obj', [17, 23])
ret = bt_value_map_foreach_entry(map_obj, test_map_foreach_cb_count,
&count);
- ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED && count == 3,
+ ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED && count == 3,
"bt_value_map_foreach_entry() breaks the loop when the user function returns BT_FALSE");
memset(&checklist, 0, sizeof(checklist));