4 * Babeltrace Plugin Component Graph
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #include <babeltrace/component/component-internal.h>
30 #include <babeltrace/component/graph-internal.h>
31 #include <babeltrace/component/connection-internal.h>
32 #include <babeltrace/component/component-sink-internal.h>
33 #include <babeltrace/component/component-source.h>
34 #include <babeltrace/component/component-filter.h>
35 #include <babeltrace/component/port.h>
36 #include <babeltrace/compiler.h>
40 void bt_graph_destroy(struct bt_object
*obj
)
42 struct bt_graph
*graph
= container_of(obj
,
43 struct bt_graph
, base
);
45 if (graph
->components
) {
46 g_ptr_array_free(graph
->components
, TRUE
);
48 if (graph
->connections
) {
49 g_ptr_array_free(graph
->connections
, TRUE
);
51 if (graph
->sinks_to_consume
) {
52 g_queue_free(graph
->sinks_to_consume
);
57 struct bt_graph
*bt_graph_create(void)
59 struct bt_graph
*graph
;
61 graph
= g_new0(struct bt_graph
, 1);
66 bt_object_init(graph
, bt_graph_destroy
);
68 graph
->connections
= g_ptr_array_new_with_free_func(bt_object_release
);
69 if (!graph
->connections
) {
72 graph
->components
= g_ptr_array_new_with_free_func(bt_object_release
);
73 if (!graph
->components
) {
76 graph
->sinks_to_consume
= g_queue_new();
77 if (!graph
->sinks_to_consume
) {
87 struct bt_connection
*bt_graph_connect(struct bt_graph
*graph
,
88 struct bt_port
*upstream_port
,
89 struct bt_port
*downstream_port
)
91 struct bt_connection
*connection
= NULL
;
92 struct bt_graph
*upstream_graph
= NULL
;
93 struct bt_graph
*downstream_graph
= NULL
;
94 struct bt_component
*upstream_component
= NULL
;
95 struct bt_component
*downstream_component
= NULL
;
96 enum bt_component_status component_status
;
97 bool upstream_was_already_in_graph
;
98 bool downstream_was_already_in_graph
;
100 if (!graph
|| !upstream_port
|| !downstream_port
) {
104 if (bt_port_get_type(upstream_port
) != BT_PORT_TYPE_OUTPUT
) {
107 if (bt_port_get_type(downstream_port
) != BT_PORT_TYPE_INPUT
) {
111 /* Ensure the components are not already part of another graph. */
112 upstream_component
= bt_port_get_component(upstream_port
);
113 assert(upstream_component
);
114 upstream_graph
= bt_component_get_graph(upstream_component
);
115 if (upstream_graph
&& (graph
!= upstream_graph
)) {
116 fprintf(stderr
, "Upstream component is already part of another graph\n");
119 upstream_was_already_in_graph
= (graph
== upstream_graph
);
121 downstream_component
= bt_port_get_component(downstream_port
);
122 assert(downstream_component
);
123 downstream_graph
= bt_component_get_graph(downstream_component
);
124 if (downstream_graph
&& (graph
!= downstream_graph
)) {
125 fprintf(stderr
, "Downstream component is already part of another graph\n");
128 downstream_was_already_in_graph
= (graph
== downstream_graph
);
130 connection
= bt_connection_create(graph
, upstream_port
,
137 * Ownership of up/downstream_component and of the connection object is
138 * transferred to the graph.
140 g_ptr_array_add(graph
->connections
, connection
);
142 if (!upstream_was_already_in_graph
) {
143 g_ptr_array_add(graph
->components
, upstream_component
);
144 bt_component_set_graph(upstream_component
, graph
);
146 if (!downstream_was_already_in_graph
) {
147 g_ptr_array_add(graph
->components
, downstream_component
);
148 bt_component_set_graph(downstream_component
, graph
);
149 if (bt_component_get_class_type(downstream_component
) ==
150 BT_COMPONENT_CLASS_TYPE_SINK
) {
151 g_queue_push_tail(graph
->sinks_to_consume
,
152 downstream_component
);
157 * The graph is now the parent of these components which garantees their
158 * existence for the duration of the graph's lifetime.
162 * The components and connection are added to the graph before invoking
163 * the new_connection method in order to make them visible to the
164 * components during the method's invocation.
166 component_status
= bt_component_new_connection(upstream_component
,
167 upstream_port
, connection
);
168 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
171 component_status
= bt_component_new_connection(downstream_component
,
172 downstream_port
, connection
);
173 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
177 bt_put(upstream_graph
);
178 bt_put(downstream_graph
);
181 if (components_added
) {
182 if (bt_component_get_class_type(downstream_component
) ==
183 BT_COMPONENT_CLASS_TYPE_SINK
) {
184 g_queue_pop_tail(graph
->sinks_to_consume
);
186 g_ptr_array_set_size(graph
->connections
,
187 graph
->connections
->len
- 1);
188 g_ptr_array_set_size(graph
->components
,
189 graph
->components
->len
- 2);
195 enum bt_component_status
get_component_port_counts(
196 struct bt_component
*component
, uint64_t *input_count
,
197 uint64_t *output_count
)
199 enum bt_component_status ret
;
201 switch (bt_component_get_class_type(component
)) {
202 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
203 ret
= bt_component_source_get_output_port_count(component
,
205 if (ret
!= BT_COMPONENT_STATUS_OK
) {
209 case BT_COMPONENT_CLASS_TYPE_FILTER
:
210 ret
= bt_component_filter_get_output_port_count(component
,
212 if (ret
!= BT_COMPONENT_STATUS_OK
) {
215 ret
= bt_component_filter_get_input_port_count(component
,
217 if (ret
!= BT_COMPONENT_STATUS_OK
) {
221 case BT_COMPONENT_CLASS_TYPE_SINK
:
222 ret
= bt_component_sink_get_input_port_count(component
,
224 if (ret
!= BT_COMPONENT_STATUS_OK
) {
232 ret
= BT_COMPONENT_STATUS_OK
;
238 struct bt_port
*get_input_port(struct bt_component
*component
, int index
)
240 struct bt_port
*port
= NULL
;
242 switch (bt_component_get_class_type(component
)) {
243 case BT_COMPONENT_CLASS_TYPE_FILTER
:
244 port
= bt_component_filter_get_input_port_at_index(component
,
247 case BT_COMPONENT_CLASS_TYPE_SINK
:
248 port
= bt_component_sink_get_input_port_at_index(component
,
258 struct bt_port
*get_output_port(struct bt_component
*component
, int index
)
260 struct bt_port
*port
= NULL
;
262 switch (bt_component_get_class_type(component
)) {
263 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
264 port
= bt_component_source_get_output_port_at_index(component
,
267 case BT_COMPONENT_CLASS_TYPE_FILTER
:
268 port
= bt_component_filter_get_output_port_at_index(component
,
277 enum bt_graph_status
bt_graph_add_component_as_sibling(struct bt_graph
*graph
,
278 struct bt_component
*origin
,
279 struct bt_component
*new_component
)
281 uint64_t origin_input_port_count
= 0;
282 uint64_t origin_output_port_count
= 0;
283 uint64_t new_input_port_count
= 0;
284 uint64_t new_output_port_count
= 0;
285 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
286 struct bt_graph
*origin_graph
= NULL
;
287 struct bt_graph
*new_graph
= NULL
;
288 struct bt_port
*origin_port
= NULL
;
289 struct bt_port
*new_port
= NULL
;
290 struct bt_port
*upstream_port
= NULL
;
291 struct bt_port
*downstream_port
= NULL
;
292 struct bt_connection
*origin_connection
= NULL
;
293 struct bt_connection
*new_connection
= NULL
;
296 if (!graph
|| !origin
|| !new_component
) {
297 status
= BT_GRAPH_STATUS_INVALID
;
301 if (bt_component_get_class_type(origin
) !=
302 bt_component_get_class_type(new_component
)) {
303 status
= BT_GRAPH_STATUS_INVALID
;
307 origin_graph
= bt_component_get_graph(origin
);
308 if (!origin_graph
|| (origin_graph
!= graph
)) {
309 status
= BT_GRAPH_STATUS_INVALID
;
313 new_graph
= bt_component_get_graph(new_component
);
315 status
= BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH
;
319 if (get_component_port_counts(origin
, &origin_input_port_count
,
320 &origin_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
321 status
= BT_GRAPH_STATUS_INVALID
;
324 if (get_component_port_counts(new_component
, &new_input_port_count
,
325 &new_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
326 status
= BT_GRAPH_STATUS_INVALID
;
330 if (origin_input_port_count
!= new_input_port_count
||
331 origin_output_port_count
!= new_output_port_count
) {
332 status
= BT_GRAPH_STATUS_INVALID
;
336 /* Replicate input connections. */
337 for (port_index
= 0; port_index
< origin_input_port_count
; port_index
++) {
338 uint64_t connection_count
, connection_index
;
340 origin_port
= get_input_port(origin
, port_index
);
342 status
= BT_GRAPH_STATUS_ERROR
;
343 goto error_disconnect
;
345 new_port
= get_input_port(new_component
, port_index
);
347 status
= BT_GRAPH_STATUS_ERROR
;
348 goto error_disconnect
;
351 if (bt_port_get_connection_count(origin_port
, &connection_count
) !=
353 status
= BT_GRAPH_STATUS_ERROR
;
354 goto error_disconnect
;
357 for (connection_index
= 0; connection_index
< connection_count
;
358 connection_index
++) {
359 origin_connection
= bt_port_get_connection(origin_port
,
361 if (!origin_connection
) {
362 goto error_disconnect
;
365 upstream_port
= bt_connection_get_output_port(
367 if (!upstream_port
) {
368 goto error_disconnect
;
371 new_connection
= bt_graph_connect(graph
, upstream_port
,
373 if (!new_connection
) {
374 goto error_disconnect
;
377 BT_PUT(upstream_port
);
378 BT_PUT(origin_connection
);
379 BT_PUT(new_connection
);
385 /* Replicate output connections. */
386 for (port_index
= 0; port_index
< origin_output_port_count
; port_index
++) {
387 uint64_t connection_count
, connection_index
;
389 origin_port
= get_output_port(origin
, port_index
);
391 status
= BT_GRAPH_STATUS_ERROR
;
392 goto error_disconnect
;
394 new_port
= get_output_port(new_component
, port_index
);
396 status
= BT_GRAPH_STATUS_ERROR
;
397 goto error_disconnect
;
400 if (bt_port_get_connection_count(origin_port
, &connection_count
) !=
402 status
= BT_GRAPH_STATUS_ERROR
;
403 goto error_disconnect
;
406 for (connection_index
= 0; connection_index
< connection_count
;
407 connection_index
++) {
408 origin_connection
= bt_port_get_connection(origin_port
,
410 if (!origin_connection
) {
411 goto error_disconnect
;
414 downstream_port
= bt_connection_get_input_port(
416 if (!downstream_port
) {
417 goto error_disconnect
;
420 new_connection
= bt_graph_connect(graph
, new_port
,
422 if (!new_connection
) {
423 goto error_disconnect
;
426 BT_PUT(downstream_port
);
427 BT_PUT(origin_connection
);
428 BT_PUT(new_connection
);
434 bt_put(origin_graph
);
438 bt_put(upstream_port
);
439 bt_put(downstream_port
);
440 bt_put(origin_connection
);
441 bt_put(new_connection
);
444 /* Destroy all connections of the new component. */
449 enum bt_component_status
bt_graph_consume(struct bt_graph
*graph
)
451 struct bt_component
*sink
;
452 enum bt_component_status status
;
456 status
= BT_COMPONENT_STATUS_INVALID
;
460 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
461 status
= BT_COMPONENT_STATUS_END
;
465 current_node
= g_queue_pop_head_link(graph
->sinks_to_consume
);
466 sink
= current_node
->data
;
467 status
= bt_component_sink_consume(sink
);
468 if (status
!= BT_COMPONENT_STATUS_END
) {
469 g_queue_push_tail_link(graph
->sinks_to_consume
, current_node
);
473 /* End reached, the node is not added back to the queue and free'd. */
474 g_queue_delete_link(graph
->sinks_to_consume
, current_node
);
476 /* Don't forward an END status if there are sinks left to consume. */
477 if (!g_queue_is_empty(graph
->sinks_to_consume
)) {
478 status
= BT_GRAPH_STATUS_OK
;
485 enum bt_graph_status
bt_graph_run(struct bt_graph
*graph
,
486 enum bt_component_status
*_component_status
)
488 enum bt_component_status component_status
;
489 enum bt_graph_status graph_status
= BT_GRAPH_STATUS_OK
;
492 graph_status
= BT_GRAPH_STATUS_INVALID
;
497 component_status
= bt_graph_consume(graph
);
498 if (component_status
== BT_COMPONENT_STATUS_AGAIN
) {
500 * If AGAIN is received and there are multiple sinks,
501 * go ahead and consume from the next sink.
503 * However, in the case where a single sink is left,
504 * the caller can decide to busy-wait and call
505 * bt_graph_run continuously until the source is ready
506 * or it can decide to sleep for an arbitrary amount of
509 if (graph
->sinks_to_consume
->length
> 1) {
510 component_status
= BT_COMPONENT_STATUS_OK
;
513 } while (component_status
== BT_COMPONENT_STATUS_OK
);
515 if (_component_status
) {
516 *_component_status
= component_status
;
519 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
520 graph_status
= BT_GRAPH_STATUS_END
;
521 } else if (component_status
== BT_COMPONENT_STATUS_AGAIN
) {
522 graph_status
= BT_GRAPH_STATUS_AGAIN
;
524 graph_status
= BT_GRAPH_STATUS_ERROR
;