4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
33 #include <babeltrace/babeltrace.h>
34 #include <babeltrace/compiler-internal.h>
35 #include <babeltrace/types.h>
38 #include <babeltrace/assert-internal.h>
40 #include <plugins-common.h>
42 #include "data-stream.h"
44 #include "lttng-live-internal.h"
46 #define MAX_QUERY_SIZE (256*1024)
48 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
50 static const char *print_state(struct lttng_live_stream_iterator
*s
)
53 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
54 return "ACTIVE_NO_DATA";
55 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
56 return "QUIESCENT_NO_DATA";
57 case LTTNG_LIVE_STREAM_QUIESCENT
:
59 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
61 case LTTNG_LIVE_STREAM_EOF
:
69 void print_stream_state(struct lttng_live_stream_iterator
*stream
)
73 port
= bt_port_from_private(stream
->port
);
74 print_dbg("stream %s state %s last_inact_ts %" PRId64
" cur_inact_ts %" PRId64
,
75 bt_port_get_name(port
),
77 stream
->last_returned_inactivity_timestamp
,
78 stream
->current_inactivity_timestamp
);
79 bt_port_put_ref(port
);
83 bt_bool
lttng_live_is_canceled(struct lttng_live_component
*lttng_live
)
85 bt_component
*component
;
86 const bt_graph
*graph
;
93 component
= bt_component_from_private(lttng_live
->private_component
);
94 graph
= bt_component_get_graph(component
);
95 ret
= bt_graph_is_canceled(graph
);
96 bt_graph_put_ref(graph
);
97 bt_component_put_ref(component
);
102 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
103 struct lttng_live_stream_iterator
*stream_iter
)
106 struct bt_private_port
*private_port
;
107 char name
[STREAM_NAME_MAX_LEN
];
108 bt_component_status status
;
110 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
112 strcpy(stream_iter
->name
, name
);
113 if (lttng_live_is_canceled(lttng_live
)) {
116 status
= bt_self_component_source_add_output_port(
117 lttng_live
->private_component
, name
, stream_iter
,
120 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED
:
122 case BT_COMPONENT_STATUS_OK
:
127 bt_object_put_ref(private_port
); /* weak */
128 BT_LOGI("Added port %s", name
);
130 if (lttng_live
->no_stream_port
) {
131 bt_object_get_ref(lttng_live
->no_stream_port
);
132 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
133 bt_object_put_ref(lttng_live
->no_stream_port
);
137 lttng_live
->no_stream_port
= NULL
;
138 lttng_live
->no_stream_iter
->port
= NULL
;
140 stream_iter
->port
= private_port
;
145 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
146 struct bt_private_port
*port
)
148 bt_component
*component
;
152 component
= bt_component_from_private(lttng_live
->private_component
);
153 nr_ports
= bt_component_source_get_output_port_count(component
);
157 BT_COMPONENT_PUT_REF_AND_RESET(component
);
159 bt_component_status status
;
161 BT_ASSERT(!lttng_live
->no_stream_port
);
163 if (lttng_live_is_canceled(lttng_live
)) {
166 status
= bt_self_component_source_add_output_port(lttng_live
->private_component
,
167 "no-stream", lttng_live
->no_stream_iter
,
168 <tng_live
->no_stream_port
);
170 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED
:
172 case BT_COMPONENT_STATUS_OK
:
177 bt_object_put_ref(lttng_live
->no_stream_port
); /* weak */
178 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
180 bt_object_get_ref(port
);
181 ret
= bt_private_port_remove_from_component(port
);
182 bt_object_put_ref(port
);
190 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
193 struct lttng_live_trace
*trace
;
195 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
196 if (trace
->id
== trace_id
) {
204 void lttng_live_destroy_trace(bt_object
*obj
)
206 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
208 BT_LOGI("Destroy trace");
209 BT_ASSERT(bt_list_empty(&trace
->streams
));
210 bt_list_del(&trace
->node
);
215 retval
= bt_trace_set_is_static(trace
->trace
);
217 BT_TRACE_PUT_REF_AND_RESET(trace
->trace
);
219 lttng_live_metadata_fini(trace
);
220 BT_OBJECT_PUT_REF_AND_RESET(trace
->cc_prio_map
);
225 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
228 struct lttng_live_trace
*trace
= NULL
;
230 trace
= g_new0(struct lttng_live_trace
, 1);
234 trace
->session
= session
;
235 trace
->id
= trace_id
;
236 BT_INIT_LIST_HEAD(&trace
->streams
);
237 trace
->new_metadata_needed
= true;
238 bt_list_add(&trace
->node
, &session
->traces
);
239 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
240 BT_LOGI("Create trace");
250 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
253 struct lttng_live_trace
*trace
;
255 trace
= lttng_live_find_trace(session
, trace_id
);
257 bt_object_get_ref(trace
);
260 return lttng_live_create_trace(session
, trace_id
);
264 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
266 bt_object_put_ref(trace
);
270 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
272 struct lttng_live_stream_iterator
*stream
, *s
;
274 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
275 lttng_live_stream_iterator_destroy(stream
);
277 lttng_live_metadata_fini(trace
);
281 int lttng_live_add_session(struct lttng_live_component
*lttng_live
,
282 uint64_t session_id
, const char *hostname
,
283 const char *session_name
)
286 struct lttng_live_session
*s
;
288 s
= g_new0(struct lttng_live_session
, 1);
294 BT_INIT_LIST_HEAD(&s
->traces
);
295 s
->lttng_live
= lttng_live
;
296 s
->new_streams_needed
= true;
297 s
->hostname
= g_string_new(hostname
);
298 s
->session_name
= g_string_new(session_name
);
300 BT_LOGI("Reading from session: %" PRIu64
" hostname: %s session_name: %s",
301 s
->id
, hostname
, session_name
);
302 bt_list_add(&s
->node
, <tng_live
->sessions
);
305 BT_LOGE("Error adding session");
313 void lttng_live_destroy_session(struct lttng_live_session
*session
)
315 struct lttng_live_trace
*trace
, *t
;
317 BT_LOGI("Destroy session");
318 if (session
->id
!= -1ULL) {
319 if (lttng_live_detach_session(session
)) {
320 if (!lttng_live_is_canceled(session
->lttng_live
)) {
321 /* Old relayd cannot detach sessions. */
322 BT_LOGD("Unable to detach session %" PRIu64
,
328 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
329 lttng_live_close_trace_streams(trace
);
331 bt_list_del(&session
->node
);
332 if (session
->hostname
) {
333 g_string_free(session
->hostname
, TRUE
);
335 if (session
->session_name
) {
336 g_string_free(session
->session_name
, TRUE
);
342 void lttng_live_iterator_finalize(bt_self_message_iterator
*it
)
344 struct lttng_live_stream_iterator_generic
*s
=
345 bt_self_message_iterator_get_user_data(it
);
348 case LIVE_STREAM_TYPE_NO_STREAM
:
350 /* Leave no_stream_iter in place when port is removed. */
353 case LIVE_STREAM_TYPE_STREAM
:
355 struct lttng_live_stream_iterator
*stream_iter
=
356 container_of(s
, struct lttng_live_stream_iterator
, p
);
358 lttng_live_stream_iterator_destroy(stream_iter
);
365 bt_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
366 struct lttng_live_component
*lttng_live
,
367 struct lttng_live_stream_iterator
*lttng_live_stream
)
369 switch (lttng_live_stream
->state
) {
370 case LTTNG_LIVE_STREAM_QUIESCENT
:
371 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
373 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
375 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
377 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
379 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
381 case LTTNG_LIVE_STREAM_EOF
:
384 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
388 * For active no data stream, fetch next data. It can be either:
389 * - quiescent: need to put it in the prio heap at quiescent end
391 * - have data: need to wire up first event into the prio heap,
392 * - have no data on this stream at this point: need to retry (AGAIN) or
396 bt_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
397 struct lttng_live_component
*lttng_live
,
398 struct lttng_live_stream_iterator
*lttng_live_stream
)
400 bt_lttng_live_iterator_status ret
=
401 BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
402 struct packet_index index
;
403 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
405 if (lttng_live_stream
->trace
->new_metadata_needed
) {
406 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
409 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
410 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
413 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
414 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
417 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
418 if (ret
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
421 BT_ASSERT(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
422 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
423 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
424 && lttng_live_stream
->last_returned_inactivity_timestamp
==
425 lttng_live_stream
->current_inactivity_timestamp
) {
426 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
427 print_stream_state(lttng_live_stream
);
429 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
433 lttng_live_stream
->base_offset
= index
.offset
;
434 lttng_live_stream
->offset
= index
.offset
;
435 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
437 if (ret
== BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
438 ret
= lttng_live_iterator_next_check_stream_state(
439 lttng_live
, lttng_live_stream
);
445 * Creation of the message requires the ctf trace to be created
446 * beforehand, but the live protocol gives us all streams (including
447 * metadata) at once. So we split it in three steps: getting streams,
448 * getting metadata (which creates the ctf trace), and then creating the
449 * per-stream messages.
452 bt_lttng_live_iterator_status
lttng_live_get_session(
453 struct lttng_live_component
*lttng_live
,
454 struct lttng_live_session
*session
)
456 bt_lttng_live_iterator_status status
;
457 struct lttng_live_trace
*trace
, *t
;
459 if (lttng_live_attach_session(session
)) {
460 if (lttng_live_is_canceled(lttng_live
)) {
461 return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
463 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
466 status
= lttng_live_get_new_streams(session
);
467 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
468 status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_END
) {
471 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
472 status
= lttng_live_metadata_update(trace
);
473 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
474 status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_END
) {
478 return lttng_live_lazy_msg_init(session
);
482 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
484 struct lttng_live_session
*session
;
486 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
487 session
->new_streams_needed
= true;
492 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
494 struct lttng_live_session
*session
;
496 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
497 struct lttng_live_trace
*trace
;
499 session
->new_streams_needed
= true;
500 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
501 trace
->new_metadata_needed
= true;
507 bt_lttng_live_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
508 struct lttng_live_component
*lttng_live
)
510 bt_lttng_live_iterator_status ret
=
511 BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
512 unsigned int nr_sessions_opened
= 0;
513 struct lttng_live_session
*session
, *s
;
515 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
516 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
517 lttng_live_destroy_session(session
);
521 * Currently, when there are no sessions, we quit immediately.
522 * We may want to add a component parameter to keep trying until
523 * we get data in the future.
524 * Also, in a remotely distant future, we could add a "new
525 * session" flag to the protocol, which would tell us that we
526 * need to query for new sessions even though we have sessions
529 if (bt_list_empty(<tng_live
->sessions
)) {
530 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_END
;
533 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
534 ret
= lttng_live_get_session(lttng_live
, session
);
536 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK
:
538 case BT_LTTNG_LIVE_ITERATOR_STATUS_END
:
539 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
544 if (!session
->closed
) {
545 nr_sessions_opened
++;
549 if (ret
== BT_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
550 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_END
;
556 bt_lttng_live_iterator_status
emit_inactivity_message(
557 struct lttng_live_component
*lttng_live
,
558 struct lttng_live_stream_iterator
*lttng_live_stream
,
559 const bt_message
**message
,
562 bt_lttng_live_iterator_status ret
=
563 BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
564 struct lttng_live_trace
*trace
;
565 const bt_clock_class
*clock_class
= NULL
;
566 bt_clock_snapshot
*clock_snapshot
= NULL
;
567 const bt_message
*msg
= NULL
;
570 trace
= lttng_live_stream
->trace
;
574 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
578 clock_snapshot
= bt_clock_snapshot_create(clock_class
, timestamp
);
579 if (!clock_snapshot
) {
582 msg
= bt_message_inactivity_create(trace
->cc_prio_map
);
586 retval
= bt_message_inactivity_set_clock_snapshot(msg
, clock_snapshot
);
592 bt_object_put_ref(clock_snapshot
);
593 bt_clock_class_put_ref(clock_class
);
597 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
598 bt_message_put_ref(msg
);
603 bt_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
604 struct lttng_live_component
*lttng_live
,
605 struct lttng_live_stream_iterator
*lttng_live_stream
,
606 const bt_message
**message
)
608 bt_lttng_live_iterator_status ret
=
609 BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
610 const bt_clock_class
*clock_class
= NULL
;
611 bt_clock_snapshot
*clock_snapshot
= NULL
;
613 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
614 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
617 if (lttng_live_stream
->current_inactivity_timestamp
==
618 lttng_live_stream
->last_returned_inactivity_timestamp
) {
619 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
620 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
624 ret
= emit_inactivity_message(lttng_live
, lttng_live_stream
, message
,
625 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
627 lttng_live_stream
->last_returned_inactivity_timestamp
=
628 lttng_live_stream
->current_inactivity_timestamp
;
630 bt_object_put_ref(clock_snapshot
);
631 bt_clock_class_put_ref(clock_class
);
636 bt_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
637 struct lttng_live_component
*lttng_live
,
638 struct lttng_live_stream_iterator
*lttng_live_stream
,
639 const bt_message
**message
)
641 bt_lttng_live_iterator_status ret
=
642 BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
643 enum bt_msg_iter_status status
;
644 struct lttng_live_session
*session
;
646 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
647 struct lttng_live_trace
*trace
;
649 if (session
->new_streams_needed
) {
650 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
652 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
653 if (trace
->new_metadata_needed
) {
654 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
659 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
660 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
662 if (lttng_live_stream
->packet_end_msg_queue
) {
663 *message
= lttng_live_stream
->packet_end_msg_queue
;
664 lttng_live_stream
->packet_end_msg_queue
= NULL
;
665 status
= BT_MSG_ITER_STATUS_OK
;
667 status
= bt_msg_iter_get_next_message(
668 lttng_live_stream
->msg_iter
,
669 lttng_live_stream
->trace
->cc_prio_map
,
671 if (status
== BT_MSG_ITER_STATUS_OK
) {
673 * Consider empty packets as inactivity.
675 if (bt_message_get_type(*message
) == BT_MESSAGE_TYPE_PACKET_END
) {
676 lttng_live_stream
->packet_end_msg_queue
= *message
;
678 return emit_inactivity_message(lttng_live
,
679 lttng_live_stream
, message
,
680 lttng_live_stream
->current_packet_end_timestamp
);
685 case BT_MSG_ITER_STATUS_EOF
:
686 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_END
;
688 case BT_MSG_ITER_STATUS_OK
:
689 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
;
691 case BT_MSG_ITER_STATUS_AGAIN
:
693 * Continue immediately (end of packet). The next
694 * get_index may return AGAIN to delay the following
697 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
699 case BT_MSG_ITER_STATUS_INVAL
:
700 /* No argument provided by the user, so don't return INVAL. */
701 case BT_MSG_ITER_STATUS_ERROR
:
703 ret
= BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
711 * handle_no_data_streams()
713 * - for each ACTIVE_NO_DATA stream:
714 * - query relayd for stream data, or quiescence info.
715 * - if need metadata, get metadata, goto retry.
716 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
717 * - if quiescent, move to QUIESCENT streams
718 * - if fetched data, move to ACTIVE_DATA streams
719 * (at this point each stream either has data, or is quiescent)
723 * handle_new_streams_and_metadata()
724 * - query relayd for known streams, add them as ACTIVE_NO_DATA
725 * - query relayd for metadata
727 * call handle_active_no_data_streams()
729 * handle_quiescent_streams()
730 * - if at least one stream is ACTIVE_DATA:
731 * - peek stream event with lowest timestamp -> next_ts
732 * - for each quiescent stream
733 * - if next_ts >= quiescent end
734 * - set state to ACTIVE_NO_DATA
736 * - for each quiescent stream
737 * - set state to ACTIVE_NO_DATA
739 * call handle_active_no_data_streams()
741 * handle_active_data_streams()
742 * - if at least one stream is ACTIVE_DATA:
743 * - get stream event with lowest timestamp from heap
744 * - make that stream event the current message.
745 * - move this stream heap position to its next event
746 * - if we need to fetch data from relayd, move
747 * stream to ACTIVE_NO_DATA.
751 * end criterion: ctrl-c on client. If relayd exits or the session
752 * closes on the relay daemon side, we keep on waiting for streams.
753 * Eventually handle --end timestamp (also an end criterion).
755 * When disconnected from relayd: try to re-connect endlessly.
758 bt_message_iterator_next_method_return
lttng_live_iterator_next_stream(
759 bt_self_message_iterator
*iterator
,
760 struct lttng_live_stream_iterator
*stream_iter
)
762 bt_lttng_live_iterator_status status
;
763 bt_message_iterator_next_method_return next_return
;
764 struct lttng_live_component
*lttng_live
;
766 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
768 print_stream_state(stream_iter
);
769 next_return
.message
= NULL
;
770 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
771 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
774 status
= lttng_live_iterator_next_handle_one_no_data_stream(
775 lttng_live
, stream_iter
);
776 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
779 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
780 lttng_live
, stream_iter
, &next_return
.message
);
781 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
782 BT_ASSERT(next_return
.message
== NULL
);
785 if (next_return
.message
) {
788 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
789 stream_iter
, &next_return
.message
);
790 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
791 BT_ASSERT(next_return
.message
== NULL
);
796 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
797 print_dbg("continue");
799 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
800 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_AGAIN
;
803 case BT_LTTNG_LIVE_ITERATOR_STATUS_END
:
804 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_END
;
807 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK
:
808 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_OK
;
811 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
812 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_INVALID
;
814 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
815 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_NOMEM
;
817 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
818 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED
;
820 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
821 default: /* fall-through */
822 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_ERROR
;
829 bt_message_iterator_next_method_return
lttng_live_iterator_next_no_stream(
830 bt_self_message_iterator
*iterator
,
831 struct lttng_live_no_stream_iterator
*no_stream_iter
)
833 bt_lttng_live_iterator_status status
;
834 bt_message_iterator_next_method_return next_return
;
835 struct lttng_live_component
*lttng_live
;
837 lttng_live
= no_stream_iter
->lttng_live
;
839 lttng_live_force_new_streams_and_metadata(lttng_live
);
840 next_return
.message
= NULL
;
841 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
842 if (status
!= BT_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
845 if (no_stream_iter
->port
) {
846 status
= BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
848 status
= BT_LTTNG_LIVE_ITERATOR_STATUS_END
;
852 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
854 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
855 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_AGAIN
;
857 case BT_LTTNG_LIVE_ITERATOR_STATUS_END
:
858 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_END
;
860 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
861 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_INVALID
;
863 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
864 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_NOMEM
;
866 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
867 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED
;
869 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
870 default: /* fall-through */
871 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_ERROR
;
878 bt_message_iterator_next_method_return
lttng_live_iterator_next(
879 bt_self_message_iterator
*iterator
)
881 struct lttng_live_stream_iterator_generic
*s
=
882 bt_self_message_iterator_get_user_data(iterator
);
883 bt_message_iterator_next_method_return next_return
;
886 case LIVE_STREAM_TYPE_NO_STREAM
:
887 next_return
= lttng_live_iterator_next_no_stream(iterator
,
888 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
890 case LIVE_STREAM_TYPE_STREAM
:
891 next_return
= lttng_live_iterator_next_stream(iterator
,
892 container_of(s
, struct lttng_live_stream_iterator
, p
));
895 next_return
.status
= BT_MESSAGE_ITERATOR_STATUS_ERROR
;
902 bt_message_iterator_status
lttng_live_iterator_init(
903 bt_self_message_iterator
*it
,
904 struct bt_private_port
*port
)
906 bt_message_iterator_status ret
=
907 BT_MESSAGE_ITERATOR_STATUS_OK
;
908 struct lttng_live_stream_iterator_generic
*s
;
912 s
= bt_private_port_get_user_data(port
);
915 case LIVE_STREAM_TYPE_NO_STREAM
:
917 struct lttng_live_no_stream_iterator
*no_stream_iter
=
918 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
919 ret
= bt_self_message_iterator_set_user_data(it
, no_stream_iter
);
925 case LIVE_STREAM_TYPE_STREAM
:
927 struct lttng_live_stream_iterator
*stream_iter
=
928 container_of(s
, struct lttng_live_stream_iterator
, p
);
929 ret
= bt_self_message_iterator_set_user_data(it
, stream_iter
);
936 ret
= BT_MESSAGE_ITERATOR_STATUS_ERROR
;
943 if (bt_self_message_iterator_set_user_data(it
, NULL
)
944 != BT_MESSAGE_ITERATOR_STATUS_OK
) {
945 BT_LOGE("Error setting private data to NULL");
951 bt_component_class_query_method_return
lttng_live_query_list_sessions(
952 const bt_component_class
*comp_class
,
953 const bt_query_executor
*query_exec
,
956 bt_component_class_query_method_return query_ret
= {
958 .status
= BT_QUERY_STATUS_OK
,
961 bt_value
*url_value
= NULL
;
963 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
965 url_value
= bt_value_map_get(params
, "url");
966 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
967 BT_LOGW("Mandatory \"url\" parameter missing");
968 query_ret
.status
= BT_QUERY_STATUS_INVALID_PARAMS
;
972 if (bt_value_string_get(url_value
, &url
) != BT_VALUE_STATUS_OK
) {
973 BT_LOGW("\"url\" parameter is required to be a string value");
974 query_ret
.status
= BT_QUERY_STATUS_INVALID_PARAMS
;
978 viewer_connection
= bt_live_viewer_connection_create(url
, NULL
);
979 if (!viewer_connection
) {
984 bt_live_viewer_connection_list_sessions(viewer_connection
);
985 if (!query_ret
.result
) {
992 BT_OBJECT_PUT_REF_AND_RESET(query_ret
.result
);
994 if (query_ret
.status
>= 0) {
995 query_ret
.status
= BT_QUERY_STATUS_ERROR
;
999 if (viewer_connection
) {
1000 bt_live_viewer_connection_destroy(viewer_connection
);
1002 BT_VALUE_PUT_REF_AND_RESET(url_value
);
1007 bt_component_class_query_method_return
lttng_live_query(
1008 const bt_component_class
*comp_class
,
1009 const bt_query_executor
*query_exec
,
1010 const char *object
, bt_value
*params
)
1012 bt_component_class_query_method_return ret
= {
1014 .status
= BT_QUERY_STATUS_OK
,
1017 if (strcmp(object
, "sessions") == 0) {
1018 return lttng_live_query_list_sessions(comp_class
,
1019 query_exec
, params
);
1021 BT_LOGW("Unknown query object `%s`", object
);
1022 ret
.status
= BT_QUERY_STATUS_INVALID_OBJECT
;
1027 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
1030 struct lttng_live_session
*session
, *s
;
1032 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
1033 lttng_live_destroy_session(session
);
1035 BT_OBJECT_PUT_REF_AND_RESET(lttng_live
->viewer_connection
);
1036 if (lttng_live
->url
) {
1037 g_string_free(lttng_live
->url
, TRUE
);
1039 if (lttng_live
->no_stream_port
) {
1040 bt_object_get_ref(lttng_live
->no_stream_port
);
1041 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
1042 bt_object_put_ref(lttng_live
->no_stream_port
);
1045 if (lttng_live
->no_stream_iter
) {
1046 g_free(lttng_live
->no_stream_iter
);
1052 void lttng_live_component_finalize(bt_self_component
*component
)
1054 void *data
= bt_self_component_get_user_data(component
);
1059 lttng_live_component_destroy_data(data
);
1063 struct lttng_live_component
*lttng_live_component_create(bt_value
*params
,
1064 bt_self_component
*private_component
)
1066 struct lttng_live_component
*lttng_live
;
1067 bt_value
*value
= NULL
;
1069 bt_value_status ret
;
1071 lttng_live
= g_new0(struct lttng_live_component
, 1);
1075 /* TODO: make this an overridable parameter. */
1076 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
1077 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
1078 value
= bt_value_map_get(params
, "url");
1079 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
1080 BT_LOGW("Mandatory \"url\" parameter missing");
1083 url
= bt_value_string_get(value
);
1084 lttng_live
->url
= g_string_new(url
);
1085 if (!lttng_live
->url
) {
1088 BT_VALUE_PUT_REF_AND_RESET(value
);
1089 lttng_live
->viewer_connection
=
1090 bt_live_viewer_connection_create(lttng_live
->url
->str
, lttng_live
);
1091 if (!lttng_live
->viewer_connection
) {
1094 if (lttng_live_create_viewer_session(lttng_live
)) {
1097 lttng_live
->private_component
= private_component
;
1102 lttng_live_component_destroy_data(lttng_live
);
1109 bt_component_status
lttng_live_component_init(
1110 bt_self_component
*private_component
,
1111 bt_value
*params
, void *init_method_data
)
1113 struct lttng_live_component
*lttng_live
;
1114 bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1116 /* Passes ownership of iter ref to lttng_live_component_create. */
1117 lttng_live
= lttng_live_component_create(params
, private_component
);
1119 //TODO : we need access to the application cancel state
1120 //because we are not part of a graph yet.
1121 ret
= BT_COMPONENT_STATUS_NOMEM
;
1125 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1126 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1127 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1128 if (lttng_live_is_canceled(lttng_live
)) {
1131 ret
= bt_self_component_source_add_output_port(
1132 lttng_live
->private_component
, "no-stream",
1133 lttng_live
->no_stream_iter
,
1134 <tng_live
->no_stream_port
);
1135 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1138 bt_object_put_ref(lttng_live
->no_stream_port
); /* weak */
1139 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1141 ret
= bt_self_component_set_user_data(private_component
, lttng_live
);
1142 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1149 (void) bt_self_component_set_user_data(private_component
, NULL
);
1150 lttng_live_component_destroy_data(lttng_live
);
1155 bt_component_status
lttng_live_accept_port_connection(
1156 bt_self_component
*private_component
,
1157 struct bt_private_port
*self_private_port
,
1158 const bt_port
*other_port
)
1160 struct lttng_live_component
*lttng_live
=
1161 bt_self_component_get_user_data(private_component
);
1162 bt_component
*other_component
;
1163 bt_component_status status
= BT_COMPONENT_STATUS_OK
;
1164 const bt_port
*self_port
= bt_port_from_private(self_private_port
);
1166 other_component
= bt_port_get_component(other_port
);
1167 bt_component_put_ref(other_component
); /* weak */
1169 if (!lttng_live
->downstream_component
) {
1170 lttng_live
->downstream_component
= other_component
;
1175 * Compare prior component to ensure we are connected to the
1176 * same downstream component as prior ports.
1178 if (lttng_live
->downstream_component
!= other_component
) {
1179 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1180 bt_port_get_name(self_port
),
1181 bt_component_get_name(other_component
),
1182 bt_component_get_name(lttng_live
->downstream_component
));
1183 status
= BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION
;
1187 bt_port_put_ref(self_port
);