4 * Babeltrace CTF LTTng-live Client Component
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/graph/component-source.h>
32 #include <babeltrace/graph/private-port.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component.h>
35 #include <babeltrace/graph/private-component-source.h>
36 #include <babeltrace/graph/private-notification-iterator.h>
37 #include <babeltrace/graph/notification-stream.h>
38 #include <babeltrace/graph/notification-packet.h>
39 #include <babeltrace/graph/notification-event.h>
40 #include <babeltrace/graph/notification-heap.h>
41 #include <babeltrace/graph/notification-iterator.h>
42 #include <babeltrace/graph/notification-inactivity.h>
43 #include <babeltrace/compiler-internal.h>
48 #include <plugins-common.h>
50 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
52 #include "data-stream.h"
54 #include "lttng-live-internal.h"
56 #define MAX_QUERY_SIZE (256*1024)
58 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
60 static const char *print_state(struct lttng_live_stream_iterator
*s
)
63 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
64 return "ACTIVE_NO_DATA";
65 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
66 return "QUIESCENT_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT
:
69 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
71 case LTTNG_LIVE_STREAM_EOF
:
78 #define print_stream_state(stream) \
79 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
80 bt_port_get_name(bt_port_from_private_port(stream->port)), \
81 print_state(stream), stream->last_returned_inactivity_timestamp, \
82 stream->current_inactivity_timestamp)
85 int bt_lttng_live_log_level
= BT_LOG_NONE
;
88 int lttng_live_add_port(struct lttng_live_component
*lttng_live
,
89 struct lttng_live_stream_iterator
*stream_iter
)
92 struct bt_private_port
*private_port
;
93 char name
[STREAM_NAME_MAX_LEN
];
95 ret
= sprintf(name
, STREAM_NAME_PREFIX
"%" PRIu64
, stream_iter
->viewer_stream_id
);
97 strcpy(stream_iter
->name
, name
);
98 private_port
= bt_private_component_source_add_output_private_port(
99 lttng_live
->private_component
, name
, stream_iter
);
103 BT_LOGI("Added port %s", name
);
105 if (lttng_live
->no_stream_port
) {
106 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
110 BT_PUT(lttng_live
->no_stream_port
);
111 lttng_live
->no_stream_iter
->port
= NULL
;
113 stream_iter
->port
= private_port
;
118 int lttng_live_remove_port(struct lttng_live_component
*lttng_live
,
119 struct bt_private_port
*port
)
121 struct bt_component
*component
;
125 component
= bt_component_from_private_component(lttng_live
->private_component
);
126 nr_ports
= bt_component_source_get_output_port_count(component
);
132 assert(!lttng_live
->no_stream_port
);
133 lttng_live
->no_stream_port
=
134 bt_private_component_source_add_output_private_port(lttng_live
->private_component
,
135 "no-stream", lttng_live
->no_stream_iter
);
136 if (!lttng_live
->no_stream_port
) {
139 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
141 ret
= bt_private_port_remove_from_component(port
);
149 struct lttng_live_trace
*lttng_live_find_trace(struct lttng_live_session
*session
,
152 struct lttng_live_trace
*trace
;
154 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
155 if (trace
->id
== trace_id
) {
163 void lttng_live_destroy_trace(struct bt_object
*obj
)
165 struct lttng_live_trace
*trace
= container_of(obj
, struct lttng_live_trace
, obj
);
167 BT_LOGI("Destroy trace");
168 assert(bt_list_empty(&trace
->streams
));
169 bt_list_del(&trace
->node
);
170 lttng_live_metadata_fini(trace
);
171 BT_PUT(trace
->cc_prio_map
);
176 struct lttng_live_trace
*lttng_live_create_trace(struct lttng_live_session
*session
,
179 struct lttng_live_trace
*trace
= NULL
;
181 trace
= g_new0(struct lttng_live_trace
, 1);
185 trace
->session
= session
;
186 trace
->id
= trace_id
;
187 BT_INIT_LIST_HEAD(&trace
->streams
);
188 trace
->new_metadata_needed
= true;
189 bt_list_add(&trace
->node
, &session
->traces
);
190 bt_object_init(&trace
->obj
, lttng_live_destroy_trace
);
191 BT_LOGI("Create trace");
201 struct lttng_live_trace
*lttng_live_ref_trace(struct lttng_live_session
*session
,
204 struct lttng_live_trace
*trace
;
206 trace
= lttng_live_find_trace(session
, trace_id
);
211 return lttng_live_create_trace(session
, trace_id
);
215 void lttng_live_unref_trace(struct lttng_live_trace
*trace
)
221 void lttng_live_close_trace_streams(struct lttng_live_trace
*trace
)
223 struct lttng_live_stream_iterator
*stream
, *s
;
225 bt_list_for_each_entry_safe(stream
, s
, &trace
->streams
, node
) {
226 lttng_live_stream_iterator_destroy(stream
);
228 lttng_live_metadata_fini(trace
);
232 int lttng_live_add_session(struct lttng_live_component
*lttng_live
, uint64_t session_id
)
235 struct lttng_live_session
*s
;
237 s
= g_new0(struct lttng_live_session
, 1);
243 BT_INIT_LIST_HEAD(&s
->traces
);
244 s
->lttng_live
= lttng_live
;
245 s
->new_streams_needed
= true;
247 BT_LOGI("Reading from session %" PRIu64
, s
->id
);
248 bt_list_add(&s
->node
, <tng_live
->sessions
);
251 BT_LOGE("Error adding session");
259 void lttng_live_destroy_session(struct lttng_live_session
*session
)
261 struct lttng_live_trace
*trace
, *t
;
263 BT_LOGI("Destroy session");
264 if (session
->id
!= -1ULL) {
265 if (lttng_live_detach_session(session
)) {
266 /* Old relayd cannot detach sessions. */
267 BT_LOGD("Unable to detach session %" PRIu64
,
272 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
273 lttng_live_close_trace_streams(trace
);
275 bt_list_del(&session
->node
);
280 void lttng_live_iterator_finalize(struct bt_private_notification_iterator
*it
)
282 struct lttng_live_stream_iterator_generic
*s
=
283 bt_private_notification_iterator_get_user_data(it
);
286 case LIVE_STREAM_TYPE_NO_STREAM
:
288 /* Leave no_stream_iter in place when port is removed. */
291 case LIVE_STREAM_TYPE_STREAM
:
293 struct lttng_live_stream_iterator
*stream_iter
=
294 container_of(s
, struct lttng_live_stream_iterator
, p
);
296 lttng_live_stream_iterator_destroy(stream_iter
);
303 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_check_stream_state(
304 struct lttng_live_component
*lttng_live
,
305 struct lttng_live_stream_iterator
*lttng_live_stream
)
307 switch (lttng_live_stream
->state
) {
308 case LTTNG_LIVE_STREAM_QUIESCENT
:
309 case LTTNG_LIVE_STREAM_ACTIVE_DATA
:
311 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
:
313 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
315 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
:
317 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
319 case LTTNG_LIVE_STREAM_EOF
:
322 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
326 * For active no data stream, fetch next data. It can be either:
327 * - quiescent: need to put it in the prio heap at quiescent end
329 * - have data: need to wire up first event into the prio heap,
330 * - have no data on this stream at this point: need to retry (AGAIN) or
334 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_no_data_stream(
335 struct lttng_live_component
*lttng_live
,
336 struct lttng_live_stream_iterator
*lttng_live_stream
)
338 enum bt_ctf_lttng_live_iterator_status ret
=
339 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
340 struct packet_index index
;
341 enum lttng_live_stream_state orig_state
= lttng_live_stream
->state
;
343 if (lttng_live_stream
->trace
->new_metadata_needed
) {
344 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
347 if (lttng_live_stream
->trace
->session
->new_streams_needed
) {
348 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
351 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
352 && lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
) {
355 ret
= lttng_live_get_next_index(lttng_live
, lttng_live_stream
, &index
);
356 if (ret
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
359 assert(lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_EOF
);
360 if (lttng_live_stream
->state
== LTTNG_LIVE_STREAM_QUIESCENT
) {
361 if (orig_state
== LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
362 && lttng_live_stream
->last_returned_inactivity_timestamp
==
363 lttng_live_stream
->current_inactivity_timestamp
) {
364 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
365 print_stream_state(lttng_live_stream
);
367 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
371 lttng_live_stream
->base_offset
= index
.offset
;
372 lttng_live_stream
->offset
= index
.offset
;
373 lttng_live_stream
->len
= index
.packet_size
/ CHAR_BIT
;
375 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
376 ret
= lttng_live_iterator_next_check_stream_state(
377 lttng_live
, lttng_live_stream
);
383 * Creation of the notification requires the ctf trace to be created
384 * beforehand, but the live protocol gives us all streams (including
385 * metadata) at once. So we split it in three steps: getting streams,
386 * getting metadata (which creates the ctf trace), and then creating the
387 * per-stream notifications.
390 enum bt_ctf_lttng_live_iterator_status
lttng_live_get_session(
391 struct lttng_live_component
*lttng_live
,
392 struct lttng_live_session
*session
)
394 enum bt_ctf_lttng_live_iterator_status status
;
395 struct lttng_live_trace
*trace
, *t
;
397 if (lttng_live_attach_session(session
)) {
398 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
400 status
= lttng_live_get_new_streams(session
);
401 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&&
402 status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
405 bt_list_for_each_entry_safe(trace
, t
, &session
->traces
, node
) {
406 status
= lttng_live_metadata_update(trace
);
407 if (status
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
) {
410 retval
= bt_ctf_trace_set_is_static(trace
->trace
);
412 } else if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
416 return lttng_live_lazy_notif_init(session
);
420 void lttng_live_need_new_streams(struct lttng_live_component
*lttng_live
)
422 struct lttng_live_session
*session
;
424 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
425 session
->new_streams_needed
= true;
430 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component
*lttng_live
)
432 struct lttng_live_session
*session
;
434 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
435 struct lttng_live_trace
*trace
;
437 session
->new_streams_needed
= true;
438 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
439 trace
->new_metadata_needed
= true;
445 enum bt_notification_iterator_status
lttng_live_iterator_next_handle_new_streams_and_metadata(
446 struct lttng_live_component
*lttng_live
)
448 enum bt_ctf_lttng_live_iterator_status ret
=
449 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
450 unsigned int nr_sessions_opened
= 0;
451 struct lttng_live_session
*session
, *s
;
453 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
454 if (session
->closed
&& bt_list_empty(&session
->traces
)) {
455 lttng_live_destroy_session(session
);
459 * Currently, when there are no sessions, we quit immediately.
460 * We may want to add a component parameter to keep trying until
461 * we get data in the future.
462 * Also, in a remotely distant future, we could add a "new
463 * session" flag to the protocol, which would tell us that we
464 * need to query for new sessions even though we have sessions
467 if (bt_list_empty(<tng_live
->sessions
)) {
468 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
471 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
472 ret
= lttng_live_get_session(lttng_live
, session
);
474 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
476 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
477 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
482 if (!session
->closed
) {
483 nr_sessions_opened
++;
487 if (ret
== BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
&& !nr_sessions_opened
) {
488 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
494 enum bt_ctf_lttng_live_iterator_status
emit_inactivity_notification(
495 struct lttng_live_component
*lttng_live
,
496 struct lttng_live_stream_iterator
*lttng_live_stream
,
497 struct bt_notification
**notification
,
500 enum bt_ctf_lttng_live_iterator_status ret
=
501 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
502 struct lttng_live_trace
*trace
;
503 struct bt_ctf_clock_class
*clock_class
= NULL
;
504 struct bt_ctf_clock_value
*clock_value
= NULL
;
505 struct bt_notification
*notif
= NULL
;
508 trace
= lttng_live_stream
->trace
;
512 clock_class
= bt_clock_class_priority_map_get_clock_class_by_index(trace
->cc_prio_map
, 0);
516 clock_value
= bt_ctf_clock_value_create(clock_class
, timestamp
);
520 notif
= bt_notification_inactivity_create(trace
->cc_prio_map
);
524 retval
= bt_notification_inactivity_set_clock_value(notif
, clock_value
);
528 *notification
= notif
;
535 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
541 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_quiescent_stream(
542 struct lttng_live_component
*lttng_live
,
543 struct lttng_live_stream_iterator
*lttng_live_stream
,
544 struct bt_notification
**notification
)
546 enum bt_ctf_lttng_live_iterator_status ret
=
547 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
548 struct bt_ctf_clock_class
*clock_class
= NULL
;
549 struct bt_ctf_clock_value
*clock_value
= NULL
;
551 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_QUIESCENT
) {
552 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
555 if (lttng_live_stream
->current_inactivity_timestamp
==
556 lttng_live_stream
->last_returned_inactivity_timestamp
) {
557 lttng_live_stream
->state
= LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
;
558 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
562 ret
= emit_inactivity_notification(lttng_live
, lttng_live_stream
, notification
,
563 (uint64_t) lttng_live_stream
->current_inactivity_timestamp
);
565 lttng_live_stream
->last_returned_inactivity_timestamp
=
566 lttng_live_stream
->current_inactivity_timestamp
;
574 enum bt_ctf_lttng_live_iterator_status
lttng_live_iterator_next_handle_one_active_data_stream(
575 struct lttng_live_component
*lttng_live
,
576 struct lttng_live_stream_iterator
*lttng_live_stream
,
577 struct bt_notification
**notification
)
579 enum bt_ctf_lttng_live_iterator_status ret
=
580 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
581 enum bt_ctf_notif_iter_status status
;
582 struct lttng_live_session
*session
;
584 bt_list_for_each_entry(session
, <tng_live
->sessions
, node
) {
585 struct lttng_live_trace
*trace
;
587 if (session
->new_streams_needed
) {
588 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
590 bt_list_for_each_entry(trace
, &session
->traces
, node
) {
591 if (trace
->new_metadata_needed
) {
592 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
597 if (lttng_live_stream
->state
!= LTTNG_LIVE_STREAM_ACTIVE_DATA
) {
598 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
600 if (lttng_live_stream
->packet_end_notif_queue
) {
601 *notification
= lttng_live_stream
->packet_end_notif_queue
;
602 lttng_live_stream
->packet_end_notif_queue
= NULL
;
603 status
= BT_CTF_NOTIF_ITER_STATUS_OK
;
605 status
= bt_ctf_notif_iter_get_next_notification(
606 lttng_live_stream
->notif_iter
,
607 lttng_live_stream
->trace
->cc_prio_map
,
609 if (status
== BT_CTF_NOTIF_ITER_STATUS_OK
) {
611 * Consider empty packets as inactivity.
613 if (bt_notification_get_type(*notification
) == BT_NOTIFICATION_TYPE_PACKET_END
) {
614 lttng_live_stream
->packet_end_notif_queue
= *notification
;
615 *notification
= NULL
;
616 return emit_inactivity_notification(lttng_live
,
617 lttng_live_stream
, notification
,
618 lttng_live_stream
->current_packet_end_timestamp
);
623 case BT_CTF_NOTIF_ITER_STATUS_EOF
:
624 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
626 case BT_CTF_NOTIF_ITER_STATUS_OK
:
627 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
;
629 case BT_CTF_NOTIF_ITER_STATUS_AGAIN
:
631 * Continue immediately (end of packet). The next
632 * get_index may return AGAIN to delay the following
635 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
;
637 case BT_CTF_NOTIF_ITER_STATUS_INVAL
:
638 /* No argument provided by the user, so don't return INVAL. */
639 case BT_CTF_NOTIF_ITER_STATUS_ERROR
:
641 ret
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
;
649 * handle_no_data_streams()
651 * - for each ACTIVE_NO_DATA stream:
652 * - query relayd for stream data, or quiescence info.
653 * - if need metadata, get metadata, goto retry.
654 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
655 * - if quiescent, move to QUIESCENT streams
656 * - if fetched data, move to ACTIVE_DATA streams
657 * (at this point each stream either has data, or is quiescent)
661 * handle_new_streams_and_metadata()
662 * - query relayd for known streams, add them as ACTIVE_NO_DATA
663 * - query relayd for metadata
665 * call handle_active_no_data_streams()
667 * handle_quiescent_streams()
668 * - if at least one stream is ACTIVE_DATA:
669 * - peek stream event with lowest timestamp -> next_ts
670 * - for each quiescent stream
671 * - if next_ts >= quiescent end
672 * - set state to ACTIVE_NO_DATA
674 * - for each quiescent stream
675 * - set state to ACTIVE_NO_DATA
677 * call handle_active_no_data_streams()
679 * handle_active_data_streams()
680 * - if at least one stream is ACTIVE_DATA:
681 * - get stream event with lowest timestamp from heap
682 * - make that stream event the current notification.
683 * - move this stream heap position to its next event
684 * - if we need to fetch data from relayd, move
685 * stream to ACTIVE_NO_DATA.
689 * end criterion: ctrl-c on client. If relayd exits or the session
690 * closes on the relay daemon side, we keep on waiting for streams.
691 * Eventually handle --end timestamp (also an end criterion).
693 * When disconnected from relayd: try to re-connect endlessly.
696 struct bt_notification_iterator_next_return
lttng_live_iterator_next_stream(
697 struct bt_private_notification_iterator
*iterator
,
698 struct lttng_live_stream_iterator
*stream_iter
)
700 enum bt_ctf_lttng_live_iterator_status status
;
701 struct bt_notification_iterator_next_return next_return
;
702 struct lttng_live_component
*lttng_live
;
704 lttng_live
= stream_iter
->trace
->session
->lttng_live
;
706 print_stream_state(stream_iter
);
707 next_return
.notification
= NULL
;
708 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
709 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
712 status
= lttng_live_iterator_next_handle_one_no_data_stream(
713 lttng_live
, stream_iter
);
714 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
717 status
= lttng_live_iterator_next_handle_one_quiescent_stream(
718 lttng_live
, stream_iter
, &next_return
.notification
);
719 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
720 assert(next_return
.notification
== NULL
);
723 if (next_return
.notification
) {
726 status
= lttng_live_iterator_next_handle_one_active_data_stream(lttng_live
,
727 stream_iter
, &next_return
.notification
);
728 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
729 assert(next_return
.notification
== NULL
);
734 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
735 print_dbg("continue");
737 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
738 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
741 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
742 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
745 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
746 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
749 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
750 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
753 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
755 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
756 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
758 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
759 default: /* fall-through */
760 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
767 struct bt_notification_iterator_next_return
lttng_live_iterator_next_no_stream(
768 struct bt_private_notification_iterator
*iterator
,
769 struct lttng_live_no_stream_iterator
*no_stream_iter
)
771 enum bt_ctf_lttng_live_iterator_status status
;
772 struct bt_notification_iterator_next_return next_return
;
773 struct lttng_live_component
*lttng_live
;
775 lttng_live
= no_stream_iter
->lttng_live
;
777 lttng_live_force_new_streams_and_metadata(lttng_live
);
778 status
= lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live
);
779 if (status
!= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
) {
782 if (no_stream_iter
->port
) {
783 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
;
785 status
= BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
;
789 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE
:
791 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN
:
792 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
794 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END
:
795 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
797 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK
:
798 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_OK
;
800 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL
:
801 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_INVALID
;
803 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM
:
804 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_NOMEM
;
806 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED
:
807 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED
;
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR
:
810 default: /* fall-through */
811 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
818 struct bt_notification_iterator_next_return
lttng_live_iterator_next(
819 struct bt_private_notification_iterator
*iterator
)
821 struct lttng_live_stream_iterator_generic
*s
=
822 bt_private_notification_iterator_get_user_data(iterator
);
823 struct bt_notification_iterator_next_return next_return
;
826 case LIVE_STREAM_TYPE_NO_STREAM
:
827 next_return
= lttng_live_iterator_next_no_stream(iterator
,
828 container_of(s
, struct lttng_live_no_stream_iterator
, p
));
830 case LIVE_STREAM_TYPE_STREAM
:
831 next_return
= lttng_live_iterator_next_stream(iterator
,
832 container_of(s
, struct lttng_live_stream_iterator
, p
));
835 next_return
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
842 enum bt_notification_iterator_status
lttng_live_iterator_init(
843 struct bt_private_notification_iterator
*it
,
844 struct bt_private_port
*port
)
846 enum bt_notification_iterator_status ret
=
847 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
848 struct lttng_live_stream_iterator_generic
*s
;
852 s
= bt_private_port_get_user_data(port
);
855 case LIVE_STREAM_TYPE_NO_STREAM
:
857 struct lttng_live_no_stream_iterator
*no_stream_iter
=
858 container_of(s
, struct lttng_live_no_stream_iterator
, p
);
859 ret
= bt_private_notification_iterator_set_user_data(it
, no_stream_iter
);
865 case LIVE_STREAM_TYPE_STREAM
:
867 struct lttng_live_stream_iterator
*stream_iter
=
868 container_of(s
, struct lttng_live_stream_iterator
, p
);
869 ret
= bt_private_notification_iterator_set_user_data(it
, stream_iter
);
876 ret
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
883 if (bt_private_notification_iterator_set_user_data(it
, NULL
)
884 != BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
885 BT_LOGE("Error setting private data to NULL");
891 struct bt_value
*lttng_live_query_list_sessions(struct bt_component_class
*comp_class
,
892 struct bt_value
*params
)
894 struct bt_value
*url_value
= NULL
;
895 struct bt_value
*results
= NULL
;
897 struct bt_live_viewer_connection
*viewer_connection
= NULL
;
898 enum bt_value_status ret
;
900 url_value
= bt_value_map_get(params
, "url");
901 if (!url_value
|| bt_value_is_null(url_value
) || !bt_value_is_string(url_value
)) {
902 BT_LOGW("Mandatory \"url\" parameter missing");
906 ret
= bt_value_string_get(url_value
, &url
);
907 if (ret
!= BT_VALUE_STATUS_OK
) {
908 BT_LOGW("\"url\" parameter is required to be a string value");
912 viewer_connection
= bt_live_viewer_connection_create(url
, stderr
);
913 if (!viewer_connection
) {
914 ret
= BT_COMPONENT_STATUS_NOMEM
;
918 results
= bt_live_viewer_connection_list_sessions(viewer_connection
);
923 if (viewer_connection
) {
924 bt_live_viewer_connection_destroy(viewer_connection
);
931 struct bt_value
*lttng_live_query(struct bt_component_class
*comp_class
,
932 const char *object
, struct bt_value
*params
)
934 if (strcmp(object
, "sessions") == 0) {
935 return lttng_live_query_list_sessions(comp_class
,
938 BT_LOGW("Unknown query object `%s`", object
);
943 void lttng_live_component_destroy_data(struct lttng_live_component
*lttng_live
)
946 struct lttng_live_session
*session
, *s
;
948 bt_list_for_each_entry_safe(session
, s
, <tng_live
->sessions
, node
) {
949 lttng_live_destroy_session(session
);
951 BT_PUT(lttng_live
->viewer_connection
);
952 if (lttng_live
->url
) {
953 g_string_free(lttng_live
->url
, TRUE
);
955 if (lttng_live
->no_stream_port
) {
956 ret
= bt_private_port_remove_from_component(lttng_live
->no_stream_port
);
958 BT_PUT(lttng_live
->no_stream_port
);
960 if (lttng_live
->no_stream_iter
) {
961 g_free(lttng_live
->no_stream_iter
);
967 void lttng_live_component_finalize(struct bt_private_component
*component
)
969 void *data
= bt_private_component_get_user_data(component
);
974 lttng_live_component_destroy_data(data
);
978 struct lttng_live_component
*lttng_live_component_create(struct bt_value
*params
,
979 struct bt_private_component
*private_component
)
981 struct lttng_live_component
*lttng_live
;
982 struct bt_value
*value
= NULL
;
984 enum bt_value_status ret
;
986 lttng_live
= g_new0(struct lttng_live_component
, 1);
990 /* TODO: make this an overridable parameter. */
991 lttng_live
->max_query_size
= MAX_QUERY_SIZE
;
992 BT_INIT_LIST_HEAD(<tng_live
->sessions
);
993 value
= bt_value_map_get(params
, "url");
994 if (!value
|| bt_value_is_null(value
) || !bt_value_is_string(value
)) {
995 BT_LOGW("Mandatory \"url\" parameter missing");
998 ret
= bt_value_string_get(value
, &url
);
999 if (ret
!= BT_VALUE_STATUS_OK
) {
1000 BT_LOGW("\"url\" parameter is required to be a string value");
1003 lttng_live
->url
= g_string_new(url
);
1004 if (!lttng_live
->url
) {
1007 lttng_live
->viewer_connection
=
1008 bt_live_viewer_connection_create(lttng_live
->url
->str
,
1010 if (!lttng_live
->viewer_connection
) {
1011 ret
= BT_COMPONENT_STATUS_NOMEM
;
1014 if (lttng_live_create_viewer_session(lttng_live
)) {
1015 ret
= BT_COMPONENT_STATUS_ERROR
;
1018 lttng_live
->private_component
= private_component
;
1023 lttng_live_component_destroy_data(lttng_live
);
1030 enum bt_component_status
lttng_live_component_init(struct bt_private_component
*component
,
1031 struct bt_value
*params
, void *init_method_data
)
1033 struct lttng_live_component
*lttng_live
;
1034 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
1036 /* Passes ownership of iter ref to lttng_live_component_create. */
1037 lttng_live
= lttng_live_component_create(params
, component
);
1039 ret
= BT_COMPONENT_STATUS_NOMEM
;
1043 lttng_live
->no_stream_iter
= g_new0(struct lttng_live_no_stream_iterator
, 1);
1044 lttng_live
->no_stream_iter
->p
.type
= LIVE_STREAM_TYPE_NO_STREAM
;
1045 lttng_live
->no_stream_iter
->lttng_live
= lttng_live
;
1047 lttng_live
->no_stream_port
=
1048 bt_private_component_source_add_output_private_port(
1049 lttng_live
->private_component
, "no-stream",
1050 lttng_live
->no_stream_iter
);
1051 lttng_live
->no_stream_iter
->port
= lttng_live
->no_stream_port
;
1053 ret
= bt_private_component_set_user_data(component
, lttng_live
);
1054 if (ret
!= BT_COMPONENT_STATUS_OK
) {
1061 (void) bt_private_component_set_user_data(component
, NULL
);
1062 lttng_live_component_destroy_data(lttng_live
);
1067 void __attribute__((constructor
)) bt_lttng_live_logging_ctor(void)
1069 enum bt_logging_level log_level
= BT_LOG_NONE
;
1070 const char *log_level_env
= getenv("BABELTRACE_PLUGIN_LTTNG_LIVE_LOG_LEVEL");
1072 if (!log_level_env
) {
1076 if (strcmp(log_level_env
, "VERBOSE") == 0) {
1077 log_level
= BT_LOGGING_LEVEL_VERBOSE
;
1078 } else if (strcmp(log_level_env
, "DEBUG") == 0) {
1079 log_level
= BT_LOGGING_LEVEL_DEBUG
;
1080 } else if (strcmp(log_level_env
, "INFO") == 0) {
1081 log_level
= BT_LOGGING_LEVEL_INFO
;
1082 } else if (strcmp(log_level_env
, "WARN") == 0) {
1083 log_level
= BT_LOGGING_LEVEL_WARN
;
1084 } else if (strcmp(log_level_env
, "ERROR") == 0) {
1085 log_level
= BT_LOGGING_LEVEL_ERROR
;
1086 } else if (strcmp(log_level_env
, "FATAL") == 0) {
1087 log_level
= BT_LOGGING_LEVEL_FATAL
;
1090 bt_lttng_live_log_level
= log_level
;