6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/babeltrace-internal.h>
32 #include <babeltrace/context.h>
33 #include <babeltrace/context-internal.h>
34 #include <babeltrace/iterator-internal.h>
35 #include <babeltrace/iterator.h>
36 #include <babeltrace/prio_heap.h>
37 #include <babeltrace/ctf/metadata.h>
38 #include <babeltrace/ctf/events.h>
41 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
42 const struct bt_iter_pos
*begin_pos
,
43 unsigned long stream_id
);
45 struct stream_saved_pos
{
47 * Use file_stream pointer to check if the trace collection we
48 * restore to match the one we saved from, for each stream.
50 struct ctf_file_stream
*file_stream
;
51 size_t cur_index
; /* current index in packet index */
52 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
53 uint64_t current_real_timestamp
;
54 uint64_t current_cycles_timestamp
;
58 struct trace_collection
*tc
;
59 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
62 static int stream_read_event(struct ctf_file_stream
*sin
)
66 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
69 else if (ret
== EAGAIN
)
70 /* Stream is inactive for now (live reading). */
73 fprintf(stderr
, "[error] Reading event failed.\n");
81 * Return true if a < b, false otherwise.
82 * If time stamps are exactly the same, compare by stream path. This
83 * ensures we get the same result between runs on the same trace
84 * collection on different environments.
85 * The result will be random for memory-mapped traces since there is no
86 * fixed path leading to those (they have empty path string).
88 static int stream_compare(void *a
, void *b
)
90 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
92 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
) {
94 } else if (likely(s_a
->parent
.real_timestamp
> s_b
->parent
.real_timestamp
)) {
97 return strcmp(s_a
->parent
.path
, s_b
->parent
.path
);
101 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
106 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
107 if (iter_pos
->u
.restore
->stream_saved_pos
) {
109 iter_pos
->u
.restore
->stream_saved_pos
,
112 g_free(iter_pos
->u
.restore
);
118 * seek_file_stream_by_timestamp
120 * Browse a filestream by index, if an index contains the timestamp passed in
121 * argument, seek inside the corresponding packet it until we find the event we
122 * are looking for (either the exact timestamp or the event just after the
125 * Return 0 if the seek succeded, EOF if we didn't find any packet
126 * containing the timestamp, or a positive integer for error.
128 * TODO: this should be turned into a binary search! It is currently
129 * doing a linear search in the packets. This is a O(n) operation on a
130 * very frequent code path.
132 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
135 struct ctf_stream_pos
*stream_pos
;
136 struct packet_index
*index
;
139 stream_pos
= &cfs
->pos
;
140 for (i
= 0; i
< stream_pos
->packet_index
->len
; i
++) {
141 index
= &g_array_index(stream_pos
->packet_index
,
142 struct packet_index
, i
);
143 if (index
->ts_real
.timestamp_end
< timestamp
)
146 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
148 ret
= stream_read_event(cfs
);
149 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
151 /* Can return either EOF, 0, or error (> 0). */
155 * Cannot find the timestamp within the stream packets, return
162 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
163 * the corresponding timestamp
165 * Return 0 on success.
166 * If the timestamp is not part of any file stream, return EOF to inform the
167 * user the timestamp is out of the scope.
168 * On other errors, return positive value.
170 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
171 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
175 struct bt_trace_descriptor
*td
= &tin
->parent
;
177 if (td
->interval_set
) {
179 * If this trace has an interval selected, don't allow seeks
180 * before the selected interval. We seek to the start of the
181 * interval, thereby presenting a shorter "virtual" trace.
183 timestamp
= max(timestamp
, td
->interval_real
.timestamp_begin
);
186 /* for each stream_class */
187 for (i
= 0; i
< tin
->streams
->len
; i
++) {
188 struct ctf_stream_declaration
*stream_class
;
190 stream_class
= g_ptr_array_index(tin
->streams
, i
);
193 /* for each file_stream */
194 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
195 struct ctf_stream_definition
*stream
;
196 struct ctf_file_stream
*cfs
;
198 stream
= g_ptr_array_index(stream_class
->streams
, j
);
201 cfs
= container_of(stream
, struct ctf_file_stream
,
203 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
206 ret
= bt_heap_insert(stream_heap
, cfs
);
208 /* Return positive error. */
212 } else if (ret
> 0) {
214 * Error in seek (not EOF), failure.
218 /* on EOF just do not put stream into heap. */
222 return found
? 0 : EOF
;
226 * Find timestamp of last event in the stream.
228 * Return value: 0 if OK, positive error value on error, EOF if no
231 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
232 uint64_t *timestamp_end
)
234 int ret
, count
= 0, i
;
235 uint64_t timestamp
= 0;
236 struct ctf_stream_pos
*stream_pos
;
238 stream_pos
= &cfs
->pos
;
240 * We start by the last packet, and iterate backwards until we
241 * either find at least one event, or we reach the first packet
242 * (some packets can be empty).
244 for (i
= stream_pos
->packet_index
->len
- 1; i
>= 0; i
--) {
245 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
247 /* read each event until we reach the end of the stream */
249 ret
= stream_read_event(cfs
);
252 timestamp
= cfs
->parent
.real_timestamp
;
265 *timestamp_end
= timestamp
;
268 /* Return EOF if no events were found */
276 * Find the stream within a stream class that contains the event with
277 * the largest timestamp, and save that timestamp.
279 * Return 0 if OK, EOF if no events were found in the streams, or
280 * positive value on error.
282 static int find_max_timestamp_ctf_stream_class(
283 struct ctf_stream_declaration
*stream_class
,
284 struct ctf_file_stream
**cfsp
,
285 uint64_t *max_timestamp
)
287 int ret
= EOF
, i
, found
= 0;
289 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
290 struct ctf_stream_definition
*stream
;
291 struct ctf_file_stream
*cfs
;
292 uint64_t current_max_ts
= 0;
294 stream
= g_ptr_array_index(stream_class
->streams
, i
);
297 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
298 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
303 if (current_max_ts
>= *max_timestamp
) {
304 *max_timestamp
= current_max_ts
;
309 assert(ret
>= 0 || ret
== EOF
);
317 * seek_last_ctf_trace_collection: seek trace collection to last event.
319 * Return 0 if OK, EOF if no events were found, or positive error value
322 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
323 struct ctf_file_stream
**cfsp
)
327 uint64_t max_timestamp
= 0;
332 /* For each trace in the trace_collection */
333 for (i
= 0; i
< tc
->array
->len
; i
++) {
334 struct ctf_trace
*tin
;
335 struct bt_trace_descriptor
*td_read
;
337 td_read
= g_ptr_array_index(tc
->array
, i
);
340 tin
= container_of(td_read
, struct ctf_trace
, parent
);
341 /* For each stream_class in the trace */
342 for (j
= 0; j
< tin
->streams
->len
; j
++) {
343 struct ctf_stream_declaration
*stream_class
;
345 stream_class
= g_ptr_array_index(tin
->streams
, j
);
348 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
349 cfsp
, &max_timestamp
);
354 assert(ret
== EOF
|| ret
== 0);
358 * Now we know in which file stream the last event is located,
359 * and we know its timestamp.
364 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
371 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
373 struct trace_collection
*tc
;
376 if (!iter
|| !iter_pos
)
379 switch (iter_pos
->type
) {
380 case BT_SEEK_RESTORE
:
381 if (!iter_pos
->u
.restore
)
384 bt_heap_free(iter
->stream_heap
);
385 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
387 goto error_heap_init
;
389 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
391 struct stream_saved_pos
*saved_pos
;
392 struct ctf_stream_pos
*stream_pos
;
393 struct ctf_stream_definition
*stream
;
395 saved_pos
= &g_array_index(
396 iter_pos
->u
.restore
->stream_saved_pos
,
397 struct stream_saved_pos
, i
);
398 stream
= &saved_pos
->file_stream
->parent
;
399 stream_pos
= &saved_pos
->file_stream
->pos
;
401 stream_pos
->packet_seek(&stream_pos
->parent
,
402 saved_pos
->cur_index
, SEEK_SET
);
405 * the timestamp needs to be restored after
406 * packet_seek, because this function resets
407 * the timestamp to the beginning of the packet
409 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
410 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
411 stream_pos
->offset
= saved_pos
->offset
;
412 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
414 stream
->current
.real
.begin
= 0;
415 stream
->current
.real
.end
= 0;
416 stream
->current
.cycles
.begin
= 0;
417 stream
->current
.cycles
.end
= 0;
419 stream
->prev
.real
.begin
= 0;
420 stream
->prev
.real
.end
= 0;
421 stream
->prev
.cycles
.begin
= 0;
422 stream
->prev
.cycles
.end
= 0;
424 printf_debug("restored to cur_index = %" PRId64
" and "
425 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
426 stream_pos
->cur_index
,
427 stream_pos
->offset
, stream
->real_timestamp
);
429 ret
= stream_read_event(saved_pos
->file_stream
);
435 ret
= bt_heap_insert(iter
->stream_heap
,
436 saved_pos
->file_stream
);
444 bt_heap_free(iter
->stream_heap
);
445 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
447 goto error_heap_init
;
449 /* for each trace in the trace_collection */
450 for (i
= 0; i
< tc
->array
->len
; i
++) {
451 struct ctf_trace
*tin
;
452 struct bt_trace_descriptor
*td_read
;
454 td_read
= g_ptr_array_index(tc
->array
, i
);
457 tin
= container_of(td_read
, struct ctf_trace
, parent
);
459 ret
= seek_ctf_trace_by_timestamp(tin
,
460 iter_pos
->u
.seek_time
,
463 * Positive errors are failure. Negative value
464 * is EOF (for which we continue with other
465 * traces). 0 is success. Note: on EOF, it just
466 * means that no stream has been added to the
467 * iterator for that trace, which is fine.
469 if (ret
!= 0 && ret
!= EOF
)
475 bt_heap_free(iter
->stream_heap
);
476 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
478 goto error_heap_init
;
480 for (i
= 0; i
< tc
->array
->len
; i
++) {
481 struct ctf_trace
*tin
;
482 struct bt_trace_descriptor
*td_read
;
485 td_read
= g_ptr_array_index(tc
->array
, i
);
488 tin
= container_of(td_read
, struct ctf_trace
, parent
);
490 /* Populate heap with each stream */
491 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
493 struct ctf_stream_declaration
*stream
;
496 stream
= g_ptr_array_index(tin
->streams
,
500 for (filenr
= 0; filenr
< stream
->streams
->len
;
502 struct ctf_file_stream
*file_stream
;
503 file_stream
= g_ptr_array_index(
508 ret
= babeltrace_filestream_seek(
509 file_stream
, iter_pos
,
511 if (ret
!= 0 && ret
!= EOF
) {
515 /* Do not add EOF streams */
518 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
527 struct ctf_file_stream
*cfs
= NULL
;
530 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
531 if (ret
!= 0 || !cfs
)
533 /* remove all streams from the heap */
534 bt_heap_free(iter
->stream_heap
);
535 /* Create a new empty heap */
536 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
539 /* Insert the stream that contains the last event */
540 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
546 /* not implemented */
553 bt_heap_free(iter
->stream_heap
);
555 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
556 bt_heap_free(iter
->stream_heap
);
557 g_free(iter
->stream_heap
);
558 iter
->stream_heap
= NULL
;
565 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
567 struct bt_iter_pos
*pos
;
568 struct trace_collection
*tc
;
569 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
570 struct ptr_heap iter_heap_copy
;
577 pos
= g_new0(struct bt_iter_pos
, 1);
578 pos
->type
= BT_SEEK_RESTORE
;
579 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
580 pos
->u
.restore
->tc
= tc
;
581 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
582 sizeof(struct stream_saved_pos
));
583 if (!pos
->u
.restore
->stream_saved_pos
)
586 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
590 /* iterate over each stream in the heap */
591 file_stream
= bt_heap_maximum(&iter_heap_copy
);
592 while (file_stream
!= NULL
) {
593 struct stream_saved_pos saved_pos
;
595 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
596 saved_pos
.offset
= file_stream
->pos
.last_offset
;
597 saved_pos
.file_stream
= file_stream
;
598 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
600 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
601 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
604 pos
->u
.restore
->stream_saved_pos
,
607 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
609 "timestamp = %" PRIu64
"\n",
610 file_stream
->parent
.stream_id
,
611 saved_pos
.cur_index
, saved_pos
.offset
,
612 saved_pos
.current_real_timestamp
);
614 /* remove the stream from the heap copy */
615 removed
= bt_heap_remove(&iter_heap_copy
);
616 assert(removed
== file_stream
);
618 file_stream
= bt_heap_maximum(&iter_heap_copy
);
620 bt_heap_free(&iter_heap_copy
);
624 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
630 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*unused
,
633 struct bt_iter_pos
*pos
;
635 pos
= g_new0(struct bt_iter_pos
, 1);
636 pos
->type
= BT_SEEK_TIME
;
637 pos
->u
.seek_time
= timestamp
;
642 * babeltrace_filestream_seek: seek a filestream to given position.
644 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
646 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
647 const struct bt_iter_pos
*begin_pos
,
648 unsigned long stream_id
)
652 if (!file_stream
|| !begin_pos
)
655 switch (begin_pos
->type
) {
658 * just insert into the heap we should already know
663 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
665 ret
= stream_read_event(file_stream
);
668 case BT_SEEK_RESTORE
:
670 assert(0); /* Not yet defined */
676 int bt_iter_add_trace(struct bt_iter
*iter
,
677 struct bt_trace_descriptor
*td_read
)
679 struct ctf_trace
*tin
;
680 int stream_id
, ret
= 0;
682 tin
= container_of(td_read
, struct ctf_trace
, parent
);
684 /* Populate heap with each stream */
685 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
687 struct ctf_stream_declaration
*stream
;
690 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
693 for (filenr
= 0; filenr
< stream
->streams
->len
;
695 struct ctf_file_stream
*file_stream
;
696 struct bt_iter_pos pos
;
698 file_stream
= g_ptr_array_index(stream
->streams
,
703 pos
.type
= BT_SEEK_BEGIN
;
704 ret
= babeltrace_filestream_seek(file_stream
,
710 } else if (ret
!= 0 && ret
!= EAGAIN
) {
714 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
724 int bt_iter_init(struct bt_iter
*iter
,
725 struct bt_context
*ctx
,
726 const struct bt_iter_pos
*begin_pos
,
727 const struct bt_iter_pos
*end_pos
)
732 if (!iter
|| !ctx
|| !ctx
->tc
|| !ctx
->tc
->array
)
735 if (ctx
->current_iterator
) {
740 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
741 iter
->end_pos
= end_pos
;
745 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
747 goto error_heap_init
;
749 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
750 struct bt_trace_descriptor
*td_read
;
752 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
755 ret
= bt_iter_add_trace(iter
, td_read
);
760 ctx
->current_iterator
= iter
;
761 if (begin_pos
&& begin_pos
->type
!= BT_SEEK_BEGIN
) {
762 ret
= bt_iter_set_pos(iter
, begin_pos
);
771 bt_heap_free(iter
->stream_heap
);
773 g_free(iter
->stream_heap
);
774 iter
->stream_heap
= NULL
;
779 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
780 const struct bt_iter_pos
*begin_pos
,
781 const struct bt_iter_pos
*end_pos
)
783 struct bt_iter
*iter
;
789 iter
= g_new0(struct bt_iter
, 1);
790 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
798 void bt_iter_fini(struct bt_iter
*iter
)
801 if (iter
->stream_heap
) {
802 bt_heap_free(iter
->stream_heap
);
803 g_free(iter
->stream_heap
);
805 iter
->ctx
->current_iterator
= NULL
;
806 bt_context_put(iter
->ctx
);
809 void bt_iter_destroy(struct bt_iter
*iter
)
816 int bt_iter_next(struct bt_iter
*iter
)
818 struct ctf_file_stream
*file_stream
, *removed
;
820 bool event_outside_interval
= false;
825 file_stream
= bt_heap_maximum(iter
->stream_heap
);
827 /* end of file for all streams */
832 ret
= stream_read_event(file_stream
);
833 if (file_stream
->pos
.parent
.trace
&&
834 file_stream
->pos
.parent
.trace
->interval_set
) {
835 event_outside_interval
=
836 file_stream
->parent
.real_timestamp
>
837 file_stream
->pos
.parent
.trace
->interval_real
.timestamp_end
;
839 if (ret
== EOF
|| event_outside_interval
) {
840 removed
= bt_heap_remove(iter
->stream_heap
);
841 assert(removed
== file_stream
);
844 } else if (ret
== EAGAIN
) {
846 * Live streaming: the stream is inactive for now, we
847 * just updated the timestamp_end to skip over this
848 * stream up to a certain point in time.
850 * Since we can't guarantee that a stream will ever have
851 * any activity, we can't rely on the fact that
852 * bt_iter_next will be called for each stream and deal
853 * with inactive streams. So instead, we return 0 here
854 * to the caller and let the read API handle the
864 /* Reinsert the file stream into the heap, and rebalance. */
865 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
866 assert(removed
== file_stream
);