4 * Babeltrace Plugin Component Graph
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #include <babeltrace/component/component-internal.h>
30 #include <babeltrace/component/graph-internal.h>
31 #include <babeltrace/component/connection-internal.h>
32 #include <babeltrace/component/component-sink-internal.h>
33 #include <babeltrace/component/component-source.h>
34 #include <babeltrace/component/component-filter.h>
35 #include <babeltrace/component/port.h>
36 #include <babeltrace/compiler.h>
40 void bt_graph_destroy(struct bt_object
*obj
)
42 struct bt_graph
*graph
= container_of(obj
,
43 struct bt_graph
, base
);
45 if (graph
->components
) {
46 g_ptr_array_free(graph
->components
, TRUE
);
48 if (graph
->connections
) {
49 g_ptr_array_free(graph
->connections
, TRUE
);
51 if (graph
->sinks_to_consume
) {
52 g_queue_free(graph
->sinks_to_consume
);
57 struct bt_graph
*bt_graph_create(void)
59 struct bt_graph
*graph
;
61 graph
= g_new0(struct bt_graph
, 1);
66 bt_object_init(graph
, bt_graph_destroy
);
68 graph
->connections
= g_ptr_array_new_with_free_func(bt_object_release
);
69 if (!graph
->connections
) {
72 graph
->components
= g_ptr_array_new_with_free_func(bt_object_release
);
73 if (!graph
->components
) {
76 graph
->sinks_to_consume
= g_queue_new();
77 if (!graph
->sinks_to_consume
) {
87 struct bt_connection
*bt_graph_connect(struct bt_graph
*graph
,
88 struct bt_port
*upstream_port
,
89 struct bt_port
*downstream_port
)
91 struct bt_connection
*connection
= NULL
;
92 struct bt_graph
*upstream_graph
= NULL
;
93 struct bt_graph
*downstream_graph
= NULL
;
94 struct bt_component
*upstream_component
= NULL
;
95 struct bt_component
*downstream_component
= NULL
;
96 struct bt_connection
*existing_conn
= NULL
;
97 enum bt_component_status component_status
;
98 bool upstream_was_already_in_graph
;
99 bool downstream_was_already_in_graph
;
100 int components_to_remove
= 0;
103 if (!graph
|| !upstream_port
|| !downstream_port
) {
107 /* Ensure appropriate types for upstream and downstream ports. */
108 if (bt_port_get_type(upstream_port
) != BT_PORT_TYPE_OUTPUT
) {
111 if (bt_port_get_type(downstream_port
) != BT_PORT_TYPE_INPUT
) {
115 /* Ensure that both ports are currently unconnected. */
116 existing_conn
= bt_port_get_connection(upstream_port
);
117 bt_put(existing_conn
);
119 fprintf(stderr
, "Upstream port is already connected\n");
123 existing_conn
= bt_port_get_connection(downstream_port
);
124 bt_put(existing_conn
);
126 fprintf(stderr
, "Downstream port is already connected\n");
131 * Ensure that both ports are still attached to their creating
134 upstream_component
= bt_port_get_component(upstream_port
);
135 if (!upstream_component
) {
136 fprintf(stderr
, "Upstream port does not belong to a component\n");
140 downstream_component
= bt_port_get_component(downstream_port
);
141 if (!downstream_component
) {
142 fprintf(stderr
, "Downstream port does not belong to a component\n");
146 /* Ensure the components are not already part of another graph. */
147 upstream_graph
= bt_component_get_graph(upstream_component
);
148 if (upstream_graph
&& (graph
!= upstream_graph
)) {
149 fprintf(stderr
, "Upstream component is already part of another graph\n");
152 upstream_was_already_in_graph
= (graph
== upstream_graph
);
153 downstream_graph
= bt_component_get_graph(downstream_component
);
154 if (downstream_graph
&& (graph
!= downstream_graph
)) {
155 fprintf(stderr
, "Downstream component is already part of another graph\n");
158 downstream_was_already_in_graph
= (graph
== downstream_graph
);
160 connection
= bt_connection_create(graph
, upstream_port
,
167 * Ownership of upstream_component/downstream_component and of
168 * the connection object is transferred to the graph.
170 g_ptr_array_add(graph
->connections
, connection
);
172 if (!upstream_was_already_in_graph
) {
173 g_ptr_array_add(graph
->components
, upstream_component
);
174 bt_component_set_graph(upstream_component
, graph
);
176 if (!downstream_was_already_in_graph
) {
177 g_ptr_array_add(graph
->components
, downstream_component
);
178 bt_component_set_graph(downstream_component
, graph
);
179 if (bt_component_get_class_type(downstream_component
) ==
180 BT_COMPONENT_CLASS_TYPE_SINK
) {
181 g_queue_push_tail(graph
->sinks_to_consume
,
182 downstream_component
);
187 * The graph is now the parent of these components which garantees their
188 * existence for the duration of the graph's lifetime.
192 * The components and connection are added to the graph before
193 * invoking the `accept_port_connection` method in order to make
194 * them visible to the components during the method's
197 component_status
= bt_component_accept_port_connection(
198 upstream_component
, upstream_port
);
199 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
202 component_status
= bt_component_accept_port_connection(
203 downstream_component
, downstream_port
);
204 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
208 bt_put(upstream_graph
);
209 bt_put(downstream_graph
);
210 bt_put(upstream_component
);
211 bt_put(downstream_component
);
215 * Remove newly-added components from the graph, being careful
216 * not to remove a component that was already present in the graph
217 * and is connected to other components.
219 components_to_remove
+= upstream_was_already_in_graph
? 0 : 1;
220 components_to_remove
+= downstream_was_already_in_graph
? 0 : 1;
222 if (!downstream_was_already_in_graph
) {
223 if (bt_component_get_class_type(downstream_component
) ==
224 BT_COMPONENT_CLASS_TYPE_SINK
) {
225 g_queue_pop_tail(graph
->sinks_to_consume
);
228 /* Remove newly created connection. */
229 g_ptr_array_set_size(graph
->connections
,
230 graph
->connections
->len
- 1);
233 * Remove newly added components.
235 * Note that this is a tricky situation. The graph, being the parent
236 * of the components, does not hold a reference to them. Normally,
237 * components are destroyed right away when the graph is released since
238 * the graph, being their parent, bounds their lifetime
239 * (see doc/ref-counting.md).
241 * In this particular case, we must take a number of steps:
242 * 1) unset the components' parent to rollback the initial state of
243 * the components being connected.
244 * Note that the reference taken by the component on its graph is
245 * released by the set_parent call.
246 * 2) set the pointer in the components array to NULL so that the
247 * destruction function called on the array's resize in invoked on
250 * NOTE: Point #1 assumes that *something* holds a reference to both
251 * components being connected. The fact that a reference is being
252 * held to a component means that it must hold a reference to its
253 * parent to prevent the parent from being destroyed (again, refer
254 * to doc/red-counting.md). This reference to a component is
255 * most likely being held *transitively* by the caller which holds
256 * a reference to both ports (a port has its component as a
259 * This assumes that a graph is not connecting components by
260 * itself while not holding a reference to the ports/components
261 * being connected (i.e. "cheating" by using internal APIs).
263 for (i
= 0; i
< components_to_remove
; i
++) {
264 struct bt_component
*component
= g_ptr_array_index(
265 graph
->components
, graph
->components
->len
- 1);
267 bt_component_set_graph(component
, NULL
);
268 g_ptr_array_index(graph
->components
,
269 graph
->components
->len
- 1) = NULL
;
270 g_ptr_array_set_size(graph
->components
,
271 graph
->components
->len
- 1);
273 /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
276 BT_PUT(upstream_component
);
277 BT_PUT(downstream_component
);
282 enum bt_component_status
get_component_port_counts(
283 struct bt_component
*component
, uint64_t *input_count
,
284 uint64_t *output_count
)
286 enum bt_component_status ret
;
288 switch (bt_component_get_class_type(component
)) {
289 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
290 ret
= bt_component_source_get_output_port_count(component
,
292 if (ret
!= BT_COMPONENT_STATUS_OK
) {
296 case BT_COMPONENT_CLASS_TYPE_FILTER
:
297 ret
= bt_component_filter_get_output_port_count(component
,
299 if (ret
!= BT_COMPONENT_STATUS_OK
) {
302 ret
= bt_component_filter_get_input_port_count(component
,
304 if (ret
!= BT_COMPONENT_STATUS_OK
) {
308 case BT_COMPONENT_CLASS_TYPE_SINK
:
309 ret
= bt_component_sink_get_input_port_count(component
,
311 if (ret
!= BT_COMPONENT_STATUS_OK
) {
319 ret
= BT_COMPONENT_STATUS_OK
;
324 enum bt_graph_status
bt_graph_add_component_as_sibling(struct bt_graph
*graph
,
325 struct bt_component
*origin
,
326 struct bt_component
*new_component
)
328 uint64_t origin_input_port_count
= 0;
329 uint64_t origin_output_port_count
= 0;
330 uint64_t new_input_port_count
= 0;
331 uint64_t new_output_port_count
= 0;
332 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
333 struct bt_graph
*origin_graph
= NULL
;
334 struct bt_graph
*new_graph
= NULL
;
335 struct bt_port
*origin_port
= NULL
;
336 struct bt_port
*new_port
= NULL
;
337 struct bt_port
*upstream_port
= NULL
;
338 struct bt_port
*downstream_port
= NULL
;
339 struct bt_connection
*origin_connection
= NULL
;
340 struct bt_connection
*new_connection
= NULL
;
343 if (!graph
|| !origin
|| !new_component
) {
344 status
= BT_GRAPH_STATUS_INVALID
;
348 if (bt_component_get_class_type(origin
) !=
349 bt_component_get_class_type(new_component
)) {
350 status
= BT_GRAPH_STATUS_INVALID
;
354 origin_graph
= bt_component_get_graph(origin
);
355 if (!origin_graph
|| (origin_graph
!= graph
)) {
356 status
= BT_GRAPH_STATUS_INVALID
;
360 new_graph
= bt_component_get_graph(new_component
);
362 status
= BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH
;
366 if (get_component_port_counts(origin
, &origin_input_port_count
,
367 &origin_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
368 status
= BT_GRAPH_STATUS_INVALID
;
371 if (get_component_port_counts(new_component
, &new_input_port_count
,
372 &new_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
373 status
= BT_GRAPH_STATUS_INVALID
;
377 if (origin_input_port_count
!= new_input_port_count
||
378 origin_output_port_count
!= new_output_port_count
) {
379 status
= BT_GRAPH_STATUS_INVALID
;
383 /* Replicate input connections. */
384 for (port_index
= 0; port_index
< origin_input_port_count
; port_index
++) {
385 origin_port
= bt_component_get_input_port_at_index(origin
,
388 status
= BT_GRAPH_STATUS_ERROR
;
389 goto error_disconnect
;
392 new_port
= bt_component_get_input_port_at_index(new_component
,
395 status
= BT_GRAPH_STATUS_ERROR
;
396 goto error_disconnect
;
399 origin_connection
= bt_port_get_connection(origin_port
);
400 if (origin_connection
) {
401 upstream_port
= bt_connection_get_upstream_port(
403 if (!upstream_port
) {
404 goto error_disconnect
;
407 new_connection
= bt_graph_connect(graph
, upstream_port
,
409 if (!new_connection
) {
410 goto error_disconnect
;
414 BT_PUT(upstream_port
);
415 BT_PUT(origin_connection
);
416 BT_PUT(new_connection
);
421 /* Replicate output connections. */
422 for (port_index
= 0; port_index
< origin_output_port_count
; port_index
++) {
423 origin_port
= bt_component_get_output_port_at_index(origin
,
426 status
= BT_GRAPH_STATUS_ERROR
;
427 goto error_disconnect
;
429 new_port
= bt_component_get_output_port_at_index(new_component
,
432 status
= BT_GRAPH_STATUS_ERROR
;
433 goto error_disconnect
;
436 origin_connection
= bt_port_get_connection(origin_port
);
437 if (origin_connection
) {
438 downstream_port
= bt_connection_get_downstream_port(
440 if (!downstream_port
) {
441 goto error_disconnect
;
444 new_connection
= bt_graph_connect(graph
, new_port
,
446 if (!new_connection
) {
447 goto error_disconnect
;
451 BT_PUT(downstream_port
);
452 BT_PUT(origin_connection
);
453 BT_PUT(new_connection
);
458 bt_put(origin_graph
);
462 bt_put(upstream_port
);
463 bt_put(downstream_port
);
464 bt_put(origin_connection
);
465 bt_put(new_connection
);
468 /* Destroy all connections of the new component. */
473 enum bt_graph_status
bt_graph_consume(struct bt_graph
*graph
)
475 struct bt_component
*sink
;
476 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
477 enum bt_component_status comp_status
;
481 status
= BT_GRAPH_STATUS_INVALID
;
485 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
486 status
= BT_GRAPH_STATUS_END
;
490 current_node
= g_queue_pop_head_link(graph
->sinks_to_consume
);
491 sink
= current_node
->data
;
492 comp_status
= bt_component_sink_consume(sink
);
493 switch (comp_status
) {
494 case BT_COMPONENT_STATUS_OK
:
496 case BT_COMPONENT_STATUS_END
:
497 status
= BT_GRAPH_STATUS_END
;
499 case BT_COMPONENT_STATUS_AGAIN
:
500 status
= BT_GRAPH_STATUS_AGAIN
;
502 case BT_COMPONENT_STATUS_INVALID
:
503 status
= BT_GRAPH_STATUS_INVALID
;
506 status
= BT_GRAPH_STATUS_ERROR
;
510 if (status
!= BT_GRAPH_STATUS_END
) {
511 g_queue_push_tail_link(graph
->sinks_to_consume
, current_node
);
515 /* End reached, the node is not added back to the queue and free'd. */
516 g_queue_delete_link(graph
->sinks_to_consume
, current_node
);
518 /* Don't forward an END status if there are sinks left to consume. */
519 if (!g_queue_is_empty(graph
->sinks_to_consume
)) {
520 status
= BT_GRAPH_STATUS_OK
;
527 enum bt_graph_status
bt_graph_run(struct bt_graph
*graph
)
529 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
532 status
= BT_GRAPH_STATUS_INVALID
;
537 status
= bt_graph_consume(graph
);
538 if (status
== BT_GRAPH_STATUS_AGAIN
) {
540 * If AGAIN is received and there are multiple sinks,
541 * go ahead and consume from the next sink.
543 * However, in the case where a single sink is left,
544 * the caller can decide to busy-wait and call
545 * bt_graph_run continuously until the source is ready
546 * or it can decide to sleep for an arbitrary amount of
549 if (graph
->sinks_to_consume
->length
> 1) {
550 status
= BT_GRAPH_STATUS_OK
;
553 } while (status
== BT_GRAPH_STATUS_OK
);
555 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
556 status
= BT_GRAPH_STATUS_END
;