4 * Babeltrace CTF Writer Output Plugin Event Handling
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK-WRITE"
32 #include <babeltrace/babeltrace.h>
33 #include <babeltrace/assert-internal.h>
36 #include <ctfcopytrace.h>
41 void unref_stream_class(const bt_stream_class
*writer_stream_class
)
43 bt_stream_class_put_ref(writer_stream_class
);
47 void unref_stream(const bt_stream_class
*writer_stream
)
49 bt_stream_class_put_ref(writer_stream
);
53 gboolean
empty_ht(gpointer key
, gpointer value
, gpointer user_data
)
59 gboolean
empty_streams_ht(gpointer key
, gpointer value
, gpointer user_data
)
62 const bt_stream
*writer_stream
= value
;
64 ret
= bt_stream_flush(writer_stream
);
66 BT_LOGD_STR("Failed to flush stream while emptying hash table.");
72 void destroy_stream_state_key(gpointer key
)
74 g_free((enum fs_writer_stream_state
*) key
);
78 void check_completed_trace(gpointer key
, gpointer value
, gpointer user_data
)
80 enum fs_writer_stream_state
*state
= value
;
81 int *trace_completed
= user_data
;
83 if (*state
!= FS_WRITER_COMPLETED_STREAM
) {
89 void trace_is_static_listener(const bt_trace
*trace
, void *data
)
91 struct fs_writer
*fs_writer
= data
;
92 int trace_completed
= 1;
94 fs_writer
->trace_static
= 1;
96 g_hash_table_foreach(fs_writer
->stream_states
,
97 check_completed_trace
, &trace_completed
);
98 if (trace_completed
) {
99 writer_close(fs_writer
->writer_component
, fs_writer
);
100 g_hash_table_remove(fs_writer
->writer_component
->trace_map
,
106 const bt_stream_class
*insert_new_stream_class(
107 struct writer_component
*writer_component
,
108 struct fs_writer
*fs_writer
,
109 const bt_stream_class
*stream_class
)
111 const bt_stream_class
*writer_stream_class
= NULL
;
112 const bt_trace
*trace
= NULL
, *writer_trace
= NULL
;
113 struct bt_ctf_writer
*ctf_writer
= fs_writer
->writer
;
114 enum bt_component_status ret
;
116 trace
= bt_stream_class_get_trace(stream_class
);
119 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
120 BT_ASSERT(writer_trace
);
122 ret
= ctf_copy_clock_classes(writer_component
->err
, writer_trace
,
123 writer_stream_class
, trace
);
124 if (ret
!= BT_COMPONENT_STATUS_OK
) {
125 BT_LOGE_STR("Failed to copy clock classes.");
129 writer_stream_class
= ctf_copy_stream_class(writer_component
->err
,
130 stream_class
, writer_trace
, true);
131 if (!writer_stream_class
) {
132 BT_LOGE_STR("Failed to copy stream class.");
136 ret
= bt_trace_add_stream_class(writer_trace
, writer_stream_class
);
138 BT_LOGE_STR("Failed to add stream_class.");
142 g_hash_table_insert(fs_writer
->stream_class_map
,
143 (gpointer
) stream_class
, writer_stream_class
);
148 BT_STREAM_CLASS_PUT_REF_AND_RESET(writer_stream_class
);
150 bt_trace_put_ref(writer_trace
);
151 bt_trace_put_ref(trace
);
152 return writer_stream_class
;
156 enum fs_writer_stream_state
*insert_new_stream_state(
157 struct writer_component
*writer_component
,
158 struct fs_writer
*fs_writer
, const bt_stream
*stream
)
160 enum fs_writer_stream_state
*v
= NULL
;
162 v
= g_new0(enum fs_writer_stream_state
, 1);
164 BT_LOGE_STR("Failed to allocate fs_writer_stream_state.");
167 *v
= FS_WRITER_UNKNOWN_STREAM
;
169 g_hash_table_insert(fs_writer
->stream_states
, stream
, v
);
175 * Make sure the output path is valid for a single trace: either it does
176 * not exists or it is empty.
178 * Return 0 if the path is valid, -1 otherwise.
181 bool valid_single_trace_path(const char *path
)
183 GError
*error
= NULL
;
187 dir
= g_dir_open(path
, 0, &error
);
189 /* Non-existent directory. */
191 /* For any other error, return an error */
192 if (error
->code
!= G_FILE_ERROR_NOENT
) {
198 /* g_dir_read_name skips "." and "..", error out on first result */
199 while (g_dir_read_name(dir
) != NULL
) {
216 int make_trace_path(struct writer_component
*writer_component
,
217 const bt_trace
*trace
, char *trace_path
)
220 const char *trace_name
;
222 if (writer_component
->single_trace
) {
225 trace_name
= bt_trace_get_name(trace
);
227 trace_name
= writer_component
->trace_name_base
->str
;
231 /* Sanitize the trace name. */
232 if (strlen(trace_name
) == 2 && !strcmp(trace_name
, "..")) {
233 BT_LOGE_STR("Trace name cannot be \"..\".");
237 if (strstr(trace_name
, "../")) {
238 BT_LOGE_STR("Trace name cannot contain \"../\".");
243 snprintf(trace_path
, PATH_MAX
, "%s" G_DIR_SEPARATOR_S
"%s",
244 writer_component
->base_path
->str
,
247 * Append a suffix if the trace_path exists and we are not in
250 if (writer_component
->single_trace
) {
251 if (valid_single_trace_path(trace_path
) != 0) {
252 BT_LOGE_STR("Invalid output directory.");
256 if (g_file_test(trace_path
, G_FILE_TEST_EXISTS
)) {
260 snprintf(trace_path
, PATH_MAX
, "%s" G_DIR_SEPARATOR_S
"%s-%d",
261 writer_component
->base_path
->str
,
263 } while (g_file_test(trace_path
, G_FILE_TEST_EXISTS
) && i
< INT_MAX
);
265 BT_LOGE_STR("Unable to find a unique trace path.");
281 struct fs_writer
*insert_new_writer(
282 struct writer_component
*writer_component
,
283 const bt_trace
*trace
)
285 struct bt_ctf_writer
*ctf_writer
= NULL
;
286 const bt_trace
*writer_trace
= NULL
;
287 char trace_path
[PATH_MAX
];
288 enum bt_component_status ret
;
289 const bt_stream
*stream
= NULL
;
290 struct fs_writer
*fs_writer
= NULL
;
293 if (writer_component
->single_trace
&& writer_component
->nr_traces
> 0) {
294 BT_LOGE_STR("Trying to process more than one trace but single trace mode enabled.");
298 ret
= make_trace_path(writer_component
, trace
, trace_path
);
300 BT_LOGE_STR("Failed to make trace path.");
304 printf("ctf.fs sink creating trace in %s\n", trace_path
);
306 ctf_writer
= bt_ctf_writer_create(trace_path
);
308 BT_LOGE_STR("Failed to create CTF writer.");
312 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
313 BT_ASSERT(writer_trace
);
315 ret
= ctf_copy_trace(writer_component
->err
, trace
, writer_trace
);
316 if (ret
!= BT_COMPONENT_STATUS_OK
) {
317 BT_LOGE_STR("Failed to copy trace.");
318 BT_OBJECT_PUT_REF_AND_RESET(ctf_writer
);
322 fs_writer
= g_new0(struct fs_writer
, 1);
324 BT_LOGE_STR("Failed to allocate fs_writer.");
327 fs_writer
->writer
= ctf_writer
;
328 fs_writer
->trace
= trace
;
329 fs_writer
->writer_trace
= writer_trace
;
330 fs_writer
->writer_component
= writer_component
;
331 BT_TRACE_PUT_REF_AND_RESET(writer_trace
);
332 fs_writer
->stream_class_map
= g_hash_table_new_full(g_direct_hash
,
333 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream_class
);
334 fs_writer
->stream_map
= g_hash_table_new_full(g_direct_hash
,
335 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream
);
336 fs_writer
->stream_states
= g_hash_table_new_full(g_direct_hash
,
337 g_direct_equal
, NULL
, destroy_stream_state_key
);
339 /* Set all the existing streams in the unknown state. */
340 nr_stream
= bt_trace_get_stream_count(trace
);
341 for (i
= 0; i
< nr_stream
; i
++) {
342 stream
= bt_trace_get_stream_by_index(trace
, i
);
345 insert_new_stream_state(writer_component
, fs_writer
, stream
);
346 BT_STREAM_PUT_REF_AND_RESET(stream
);
349 /* Check if the trace is already static or register a listener. */
350 if (bt_trace_is_static(trace
)) {
351 fs_writer
->trace_static
= 1;
352 fs_writer
->static_listener_id
= -1;
354 ret
= bt_trace_add_is_static_listener(trace
,
355 trace_is_static_listener
, NULL
, fs_writer
);
357 fs_writer
->static_listener_id
= ret
;
360 writer_component
->nr_traces
++;
361 g_hash_table_insert(writer_component
->trace_map
, (gpointer
) trace
,
369 bt_trace_put_ref(writer_trace
);
370 bt_stream_put_ref(stream
);
371 BT_OBJECT_PUT_REF_AND_RESET(ctf_writer
);
377 struct fs_writer
*get_fs_writer(struct writer_component
*writer_component
,
378 const bt_stream_class
*stream_class
)
380 const bt_trace
*trace
= NULL
;
381 struct fs_writer
*fs_writer
;
383 trace
= bt_stream_class_get_trace(stream_class
);
386 fs_writer
= g_hash_table_lookup(writer_component
->trace_map
,
389 fs_writer
= insert_new_writer(writer_component
, trace
);
391 BT_TRACE_PUT_REF_AND_RESET(trace
);
397 struct fs_writer
*get_fs_writer_from_stream(
398 struct writer_component
*writer_component
,
399 const bt_stream
*stream
)
401 const bt_stream_class
*stream_class
= NULL
;
402 struct fs_writer
*fs_writer
;
404 stream_class
= bt_stream_get_class(stream
);
405 BT_ASSERT(stream_class
);
407 fs_writer
= get_fs_writer(writer_component
, stream_class
);
409 bt_stream_class_put_ref(stream_class
);
414 const bt_stream_class
*lookup_stream_class(
415 struct writer_component
*writer_component
,
416 const bt_stream_class
*stream_class
)
418 struct fs_writer
*fs_writer
= get_fs_writer(
419 writer_component
, stream_class
);
420 BT_ASSERT(fs_writer
);
421 return (const bt_stream_class
*) g_hash_table_lookup(
422 fs_writer
->stream_class_map
, (gpointer
) stream_class
);
426 const bt_stream
*lookup_stream(struct writer_component
*writer_component
,
427 const bt_stream
*stream
)
429 struct fs_writer
*fs_writer
= get_fs_writer_from_stream(
430 writer_component
, stream
);
431 BT_ASSERT(fs_writer
);
432 return (const bt_stream
*) g_hash_table_lookup(
433 fs_writer
->stream_map
, (gpointer
) stream
);
437 const bt_stream
*insert_new_stream(
438 struct writer_component
*writer_component
,
439 struct fs_writer
*fs_writer
,
440 const bt_stream_class
*stream_class
,
441 const bt_stream
*stream
)
443 const bt_stream
*writer_stream
= NULL
;
444 const bt_stream_class
*writer_stream_class
= NULL
;
445 struct bt_ctf_writer
*ctf_writer
= bt_object_get_ref(fs_writer
->writer
);
447 writer_stream_class
= lookup_stream_class(writer_component
,
449 if (!writer_stream_class
) {
450 writer_stream_class
= insert_new_stream_class(
451 writer_component
, fs_writer
, stream_class
);
452 if (!writer_stream_class
) {
453 BT_LOGE_STR("Failed to insert a new stream_class.");
457 bt_stream_class_get_ref(writer_stream_class
);
459 writer_stream
= bt_stream_create(writer_stream_class
,
460 bt_stream_get_name(stream
));
461 BT_ASSERT(writer_stream
);
463 g_hash_table_insert(fs_writer
->stream_map
, (gpointer
) stream
,
469 BT_STREAM_PUT_REF_AND_RESET(writer_stream
);
471 bt_object_put_ref(ctf_writer
);
472 bt_stream_class_put_ref(writer_stream_class
);
473 return writer_stream
;
477 const bt_event_class
*get_event_class(struct writer_component
*writer_component
,
478 const bt_stream_class
*writer_stream_class
,
479 const bt_event_class
*event_class
)
481 return bt_stream_class_get_event_class_by_id(writer_stream_class
,
482 bt_event_class_get_id(event_class
));
486 const bt_stream
*get_writer_stream(
487 struct writer_component
*writer_component
,
488 const bt_packet
*packet
, const bt_stream
*stream
)
490 const bt_stream
*writer_stream
= NULL
;
492 writer_stream
= lookup_stream(writer_component
, stream
);
493 if (!writer_stream
) {
494 BT_LOGE_STR("Failed to find existing stream.");
497 bt_stream_get_ref(writer_stream
);
502 BT_STREAM_PUT_REF_AND_RESET(writer_stream
);
504 return writer_stream
;
508 void writer_close(struct writer_component
*writer_component
,
509 struct fs_writer
*fs_writer
)
511 if (fs_writer
->static_listener_id
>= 0) {
512 bt_trace_remove_is_static_listener(fs_writer
->trace
,
513 fs_writer
->static_listener_id
);
516 /* Empty the stream class HT. */
517 g_hash_table_foreach_remove(fs_writer
->stream_class_map
,
519 g_hash_table_destroy(fs_writer
->stream_class_map
);
521 /* Empty the stream HT. */
522 g_hash_table_foreach_remove(fs_writer
->stream_map
,
523 empty_streams_ht
, NULL
);
524 g_hash_table_destroy(fs_writer
->stream_map
);
526 /* Empty the stream state HT. */
527 g_hash_table_foreach_remove(fs_writer
->stream_states
,
529 g_hash_table_destroy(fs_writer
->stream_states
);
533 enum bt_component_status
writer_stream_begin(
534 struct writer_component
*writer_component
,
535 const bt_stream
*stream
)
537 const bt_stream_class
*stream_class
= NULL
;
538 struct fs_writer
*fs_writer
;
539 const bt_stream
*writer_stream
= NULL
;
540 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
541 enum fs_writer_stream_state
*state
;
543 stream_class
= bt_stream_get_class(stream
);
544 BT_ASSERT(stream_class
);
546 fs_writer
= get_fs_writer(writer_component
, stream_class
);
548 BT_LOGE_STR("Failed to get fs_writer.");
552 /* Set the stream as active */
553 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
555 if (fs_writer
->trace_static
) {
556 BT_LOGE_STR("Cannot add new stream on a static trace.");
559 state
= insert_new_stream_state(writer_component
, fs_writer
,
562 if (*state
!= FS_WRITER_UNKNOWN_STREAM
) {
563 BT_LOGE("Unexpected stream state: state=%d", *state
);
566 *state
= FS_WRITER_ACTIVE_STREAM
;
568 writer_stream
= insert_new_stream(writer_component
, fs_writer
,
569 stream_class
, stream
);
570 if (!writer_stream
) {
571 BT_LOGE_STR("Failed to insert new stream.");
578 ret
= BT_COMPONENT_STATUS_ERROR
;
580 bt_object_put_ref(stream_class
);
585 enum bt_component_status
writer_stream_end(
586 struct writer_component
*writer_component
,
587 const bt_stream
*stream
)
589 const bt_stream_class
*stream_class
= NULL
;
590 struct fs_writer
*fs_writer
;
591 const bt_trace
*trace
= NULL
;
592 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
593 enum fs_writer_stream_state
*state
;
595 stream_class
= bt_stream_get_class(stream
);
596 BT_ASSERT(stream_class
);
598 fs_writer
= get_fs_writer(writer_component
, stream_class
);
600 BT_LOGE_STR("Failed to get fs_writer.");
604 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
605 if (*state
!= FS_WRITER_ACTIVE_STREAM
) {
606 BT_LOGE("Unexpected stream state: state=%d", *state
);
609 *state
= FS_WRITER_COMPLETED_STREAM
;
611 g_hash_table_remove(fs_writer
->stream_map
, stream
);
613 if (fs_writer
->trace_static
) {
614 int trace_completed
= 1;
616 g_hash_table_foreach(fs_writer
->stream_states
,
617 check_completed_trace
, &trace_completed
);
618 if (trace_completed
) {
619 writer_close(writer_component
, fs_writer
);
620 g_hash_table_remove(writer_component
->trace_map
,
628 ret
= BT_COMPONENT_STATUS_ERROR
;
630 BT_OBJECT_PUT_REF_AND_RESET(trace
);
631 BT_OBJECT_PUT_REF_AND_RESET(stream_class
);
636 enum bt_component_status
writer_new_packet(
637 struct writer_component
*writer_component
,
638 const bt_packet
*packet
)
640 const bt_stream
*stream
= NULL
, *writer_stream
= NULL
;
641 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
644 stream
= bt_packet_get_stream(packet
);
647 writer_stream
= get_writer_stream(writer_component
, packet
, stream
);
648 if (!writer_stream
) {
649 BT_LOGE_STR("Failed to get writer_stream.");
652 BT_OBJECT_PUT_REF_AND_RESET(stream
);
654 int_ret
= ctf_stream_copy_packet_context(
655 writer_component
->err
, packet
, writer_stream
);
657 BT_LOGE_STR("Failed to copy packet_context.");
661 ret
= ctf_stream_copy_packet_header(writer_component
->err
,
662 packet
, writer_stream
);
664 BT_LOGE_STR("Failed to copy packet_header.");
671 ret
= BT_COMPONENT_STATUS_ERROR
;
673 bt_object_put_ref(writer_stream
);
674 bt_object_put_ref(stream
);
679 enum bt_component_status
writer_close_packet(
680 struct writer_component
*writer_component
,
681 const bt_packet
*packet
)
683 const bt_stream
*stream
= NULL
, *writer_stream
= NULL
;
684 enum bt_component_status ret
;
686 stream
= bt_packet_get_stream(packet
);
689 writer_stream
= lookup_stream(writer_component
, stream
);
690 if (!writer_stream
) {
691 BT_LOGE_STR("Failed to find existing stream.");
694 BT_OBJECT_PUT_REF_AND_RESET(stream
);
696 bt_object_get_ref(writer_stream
);
698 ret
= bt_stream_flush(writer_stream
);
700 BT_LOGE_STR("Failed to flush stream.");
704 BT_OBJECT_PUT_REF_AND_RESET(writer_stream
);
706 ret
= BT_COMPONENT_STATUS_OK
;
710 ret
= BT_COMPONENT_STATUS_ERROR
;
712 bt_object_put_ref(writer_stream
);
713 bt_object_put_ref(stream
);
718 enum bt_component_status
writer_output_event(
719 struct writer_component
*writer_component
,
720 const bt_event
*event
)
722 enum bt_component_status ret
;
723 const bt_event_class
*event_class
= NULL
, *writer_event_class
= NULL
;
724 const bt_stream
*stream
= NULL
, *writer_stream
= NULL
;
725 const bt_stream_class
*stream_class
= NULL
, *writer_stream_class
= NULL
;
726 const bt_event
*writer_event
= NULL
;
728 const bt_trace
*writer_trace
= NULL
;
730 event_class
= bt_event_get_class(event
);
731 BT_ASSERT(event_class
);
733 stream
= bt_event_get_stream(event
);
736 writer_stream
= lookup_stream(writer_component
, stream
);
737 if (!writer_stream
|| !bt_object_get_ref(writer_stream
)) {
738 BT_LOGE_STR("Failed for find existing stream.");
742 stream_class
= bt_event_class_get_stream_class(event_class
);
743 BT_ASSERT(stream_class
);
745 writer_stream_class
= lookup_stream_class(writer_component
, stream_class
);
746 if (!writer_stream_class
|| !bt_object_get_ref(writer_stream_class
)) {
747 BT_LOGE_STR("Failed to find existing stream_class.");
751 writer_trace
= bt_stream_class_get_trace(writer_stream_class
);
752 BT_ASSERT(writer_trace
);
754 writer_event_class
= get_event_class(writer_component
,
755 writer_stream_class
, event_class
);
756 if (!writer_event_class
) {
757 writer_event_class
= ctf_copy_event_class(writer_component
->err
,
758 writer_trace
, event_class
);
759 if (!writer_event_class
) {
760 BT_LOGE_STR("Failed to copy event_class.");
763 int_ret
= bt_stream_class_add_event_class(
764 writer_stream_class
, writer_event_class
);
766 BT_LOGE("Failed to add event_class: event_name=\"%s\"",
767 bt_event_class_get_name(event_class
));
772 writer_event
= ctf_copy_event(writer_component
->err
, event
,
773 writer_event_class
, true);
775 BT_LOGE("Failed to copy event: event_class=\"%s\"",
776 bt_event_class_get_name(writer_event_class
));
780 int_ret
= bt_stream_append_event(writer_stream
, writer_event
);
782 BT_LOGE("Failed to append event: event_class=\"%s\"",
783 bt_event_class_get_name(writer_event_class
));
787 ret
= BT_COMPONENT_STATUS_OK
;
791 ret
= BT_COMPONENT_STATUS_ERROR
;
793 bt_object_put_ref(writer_trace
);
794 bt_object_put_ref(writer_event
);
795 bt_object_put_ref(writer_event_class
);
796 bt_object_put_ref(writer_stream_class
);
797 bt_object_put_ref(stream_class
);
798 bt_object_put_ref(writer_stream
);
799 bt_object_put_ref(stream
);
800 bt_object_put_ref(event_class
);