6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/context.h>
32 #include <babeltrace/context-internal.h>
33 #include <babeltrace/iterator-internal.h>
34 #include <babeltrace/iterator.h>
35 #include <babeltrace/prio_heap.h>
36 #include <babeltrace/ctf/metadata.h>
37 #include <babeltrace/ctf/events.h>
40 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
41 const struct bt_iter_pos
*begin_pos
,
42 unsigned long stream_id
);
44 struct stream_saved_pos
{
46 * Use file_stream pointer to check if the trace collection we
47 * restore to match the one we saved from, for each stream.
49 struct ctf_file_stream
*file_stream
;
50 size_t cur_index
; /* current index in packet index */
51 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
52 uint64_t current_real_timestamp
;
53 uint64_t current_cycles_timestamp
;
57 struct trace_collection
*tc
;
58 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
61 static int stream_read_event(struct ctf_file_stream
*sin
)
65 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
69 fprintf(stderr
, "[error] Reading event failed.\n");
76 * returns true if a < b, false otherwise.
78 static int stream_compare(void *a
, void *b
)
80 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
82 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
)
88 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
93 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
94 if (iter_pos
->u
.restore
->stream_saved_pos
) {
96 iter_pos
->u
.restore
->stream_saved_pos
,
99 g_free(iter_pos
->u
.restore
);
105 * seek_file_stream_by_timestamp
107 * Browse a filestream by index, if an index contains the timestamp passed in
108 * argument, seek inside the corresponding packet it until we find the event we
109 * are looking for (either the exact timestamp or the event just after the
112 * Return 0 if the seek succeded, EOF if we didn't find any packet
113 * containing the timestamp, or a positive integer for error.
115 * TODO: this should be turned into a binary search! It is currently
116 * doing a linear search in the packets. This is a O(n) operation on a
117 * very frequent code path.
119 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
122 struct ctf_stream_pos
*stream_pos
;
123 struct packet_index
*index
;
126 stream_pos
= &cfs
->pos
;
127 for (i
= 0; i
< stream_pos
->packet_real_index
->len
; i
++) {
128 index
= &g_array_index(stream_pos
->packet_real_index
,
129 struct packet_index
, i
);
130 if (index
->timestamp_end
< timestamp
)
133 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
135 ret
= stream_read_event(cfs
);
136 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
138 /* Can return either EOF, 0, or error (> 0). */
142 * Cannot find the timestamp within the stream packets, return
149 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
150 * the corresponding timestamp
152 * Return 0 on success.
153 * If the timestamp is not part of any file stream, return EOF to inform the
154 * user the timestamp is out of the scope.
155 * On other errors, return positive value.
157 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
158 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
163 /* for each stream_class */
164 for (i
= 0; i
< tin
->streams
->len
; i
++) {
165 struct ctf_stream_declaration
*stream_class
;
167 stream_class
= g_ptr_array_index(tin
->streams
, i
);
170 /* for each file_stream */
171 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
172 struct ctf_stream_definition
*stream
;
173 struct ctf_file_stream
*cfs
;
175 stream
= g_ptr_array_index(stream_class
->streams
, j
);
178 cfs
= container_of(stream
, struct ctf_file_stream
,
180 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
183 ret
= bt_heap_insert(stream_heap
, cfs
);
185 /* Return positive error. */
189 } else if (ret
> 0) {
191 * Error in seek (not EOF), failure.
195 /* on EOF just do not put stream into heap. */
199 return found
? 0 : EOF
;
203 * Find timestamp of last event in the stream.
205 * Return value: 0 if OK, positive error value on error, EOF if no
208 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
209 uint64_t *timestamp_end
)
211 int ret
, count
= 0, i
;
212 uint64_t timestamp
= 0;
213 struct ctf_stream_pos
*stream_pos
;
215 stream_pos
= &cfs
->pos
;
217 * We start by the last packet, and iterate backwards until we
218 * either find at least one event, or we reach the first packet
219 * (some packets can be empty).
221 for (i
= stream_pos
->packet_real_index
->len
- 1; i
>= 0; i
--) {
222 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
224 /* read each event until we reach the end of the stream */
226 ret
= stream_read_event(cfs
);
229 timestamp
= cfs
->parent
.real_timestamp
;
242 *timestamp_end
= timestamp
;
245 /* Return EOF if no events were found */
253 * Find the stream within a stream class that contains the event with
254 * the largest timestamp, and save that timestamp.
256 * Return 0 if OK, EOF if no events were found in the streams, or
257 * positive value on error.
259 static int find_max_timestamp_ctf_stream_class(
260 struct ctf_stream_declaration
*stream_class
,
261 struct ctf_file_stream
**cfsp
,
262 uint64_t *max_timestamp
)
264 int ret
= EOF
, i
, found
= 0;
266 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
267 struct ctf_stream_definition
*stream
;
268 struct ctf_file_stream
*cfs
;
269 uint64_t current_max_ts
= 0;
271 stream
= g_ptr_array_index(stream_class
->streams
, i
);
274 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
275 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
280 if (current_max_ts
>= *max_timestamp
) {
281 *max_timestamp
= current_max_ts
;
286 assert(ret
>= 0 || ret
== EOF
);
294 * seek_last_ctf_trace_collection: seek trace collection to last event.
296 * Return 0 if OK, EOF if no events were found, or positive error value
299 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
300 struct ctf_file_stream
**cfsp
)
304 uint64_t max_timestamp
= 0;
309 /* For each trace in the trace_collection */
310 for (i
= 0; i
< tc
->array
->len
; i
++) {
311 struct ctf_trace
*tin
;
312 struct bt_trace_descriptor
*td_read
;
314 td_read
= g_ptr_array_index(tc
->array
, i
);
317 tin
= container_of(td_read
, struct ctf_trace
, parent
);
318 /* For each stream_class in the trace */
319 for (j
= 0; j
< tin
->streams
->len
; j
++) {
320 struct ctf_stream_declaration
*stream_class
;
322 stream_class
= g_ptr_array_index(tin
->streams
, j
);
325 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
326 cfsp
, &max_timestamp
);
331 assert(ret
== EOF
|| ret
== 0);
335 * Now we know in which file stream the last event is located,
336 * and we know its timestamp.
341 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
348 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
350 struct trace_collection
*tc
;
353 if (!iter
|| !iter_pos
)
356 switch (iter_pos
->type
) {
357 case BT_SEEK_RESTORE
:
358 if (!iter_pos
->u
.restore
)
361 bt_heap_free(iter
->stream_heap
);
362 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
364 goto error_heap_init
;
366 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
368 struct stream_saved_pos
*saved_pos
;
369 struct ctf_stream_pos
*stream_pos
;
370 struct ctf_stream_definition
*stream
;
372 saved_pos
= &g_array_index(
373 iter_pos
->u
.restore
->stream_saved_pos
,
374 struct stream_saved_pos
, i
);
375 stream
= &saved_pos
->file_stream
->parent
;
376 stream_pos
= &saved_pos
->file_stream
->pos
;
378 stream_pos
->packet_seek(&stream_pos
->parent
,
379 saved_pos
->cur_index
, SEEK_SET
);
382 * the timestamp needs to be restored after
383 * packet_seek, because this function resets
384 * the timestamp to the beginning of the packet
386 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
387 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
388 stream_pos
->offset
= saved_pos
->offset
;
389 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
391 stream
->prev_real_timestamp
= 0;
392 stream
->prev_real_timestamp_end
= 0;
393 stream
->prev_cycles_timestamp
= 0;
394 stream
->prev_cycles_timestamp_end
= 0;
396 printf_debug("restored to cur_index = %" PRId64
" and "
397 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
398 stream_pos
->cur_index
,
399 stream_pos
->offset
, stream
->real_timestamp
);
401 ret
= stream_read_event(saved_pos
->file_stream
);
407 ret
= bt_heap_insert(iter
->stream_heap
,
408 saved_pos
->file_stream
);
416 bt_heap_free(iter
->stream_heap
);
417 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
419 goto error_heap_init
;
421 /* for each trace in the trace_collection */
422 for (i
= 0; i
< tc
->array
->len
; i
++) {
423 struct ctf_trace
*tin
;
424 struct bt_trace_descriptor
*td_read
;
426 td_read
= g_ptr_array_index(tc
->array
, i
);
429 tin
= container_of(td_read
, struct ctf_trace
, parent
);
431 ret
= seek_ctf_trace_by_timestamp(tin
,
432 iter_pos
->u
.seek_time
,
435 * Positive errors are failure. Negative value
436 * is EOF (for which we continue with other
437 * traces). 0 is success. Note: on EOF, it just
438 * means that no stream has been added to the
439 * iterator for that trace, which is fine.
441 if (ret
!= 0 && ret
!= EOF
)
447 bt_heap_free(iter
->stream_heap
);
448 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
450 goto error_heap_init
;
452 for (i
= 0; i
< tc
->array
->len
; i
++) {
453 struct ctf_trace
*tin
;
454 struct bt_trace_descriptor
*td_read
;
457 td_read
= g_ptr_array_index(tc
->array
, i
);
460 tin
= container_of(td_read
, struct ctf_trace
, parent
);
462 /* Populate heap with each stream */
463 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
465 struct ctf_stream_declaration
*stream
;
468 stream
= g_ptr_array_index(tin
->streams
,
472 for (filenr
= 0; filenr
< stream
->streams
->len
;
474 struct ctf_file_stream
*file_stream
;
475 file_stream
= g_ptr_array_index(
480 ret
= babeltrace_filestream_seek(
481 file_stream
, iter_pos
,
483 if (ret
!= 0 && ret
!= EOF
) {
487 /* Do not add EOF streams */
490 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
499 struct ctf_file_stream
*cfs
= NULL
;
502 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
503 if (ret
!= 0 || !cfs
)
505 /* remove all streams from the heap */
506 bt_heap_free(iter
->stream_heap
);
507 /* Create a new empty heap */
508 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
511 /* Insert the stream that contains the last event */
512 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
518 /* not implemented */
525 bt_heap_free(iter
->stream_heap
);
527 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
528 bt_heap_free(iter
->stream_heap
);
529 g_free(iter
->stream_heap
);
530 iter
->stream_heap
= NULL
;
537 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
539 struct bt_iter_pos
*pos
;
540 struct trace_collection
*tc
;
541 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
542 struct ptr_heap iter_heap_copy
;
549 pos
= g_new0(struct bt_iter_pos
, 1);
550 pos
->type
= BT_SEEK_RESTORE
;
551 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
552 pos
->u
.restore
->tc
= tc
;
553 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
554 sizeof(struct stream_saved_pos
));
555 if (!pos
->u
.restore
->stream_saved_pos
)
558 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
562 /* iterate over each stream in the heap */
563 file_stream
= bt_heap_maximum(&iter_heap_copy
);
564 while (file_stream
!= NULL
) {
565 struct stream_saved_pos saved_pos
;
567 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
568 saved_pos
.offset
= file_stream
->pos
.last_offset
;
569 saved_pos
.file_stream
= file_stream
;
570 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
572 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
573 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
576 pos
->u
.restore
->stream_saved_pos
,
579 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
581 "timestamp = %" PRIu64
"\n",
582 file_stream
->parent
.stream_id
,
583 saved_pos
.cur_index
, saved_pos
.offset
,
584 saved_pos
.current_real_timestamp
);
586 /* remove the stream from the heap copy */
587 removed
= bt_heap_remove(&iter_heap_copy
);
588 assert(removed
== file_stream
);
590 file_stream
= bt_heap_maximum(&iter_heap_copy
);
592 bt_heap_free(&iter_heap_copy
);
596 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
602 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*iter
,
605 struct bt_iter_pos
*pos
;
610 pos
= g_new0(struct bt_iter_pos
, 1);
611 pos
->type
= BT_SEEK_TIME
;
612 pos
->u
.seek_time
= timestamp
;
617 * babeltrace_filestream_seek: seek a filestream to given position.
619 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
621 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
622 const struct bt_iter_pos
*begin_pos
,
623 unsigned long stream_id
)
627 if (!file_stream
|| !begin_pos
)
630 switch (begin_pos
->type
) {
633 * just insert into the heap we should already know
638 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
640 ret
= stream_read_event(file_stream
);
643 case BT_SEEK_RESTORE
:
645 assert(0); /* Not yet defined */
651 int bt_iter_init(struct bt_iter
*iter
,
652 struct bt_context
*ctx
,
653 const struct bt_iter_pos
*begin_pos
,
654 const struct bt_iter_pos
*end_pos
)
662 if (ctx
->current_iterator
) {
667 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
668 iter
->end_pos
= end_pos
;
672 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
674 goto error_heap_init
;
676 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
677 struct ctf_trace
*tin
;
678 struct bt_trace_descriptor
*td_read
;
680 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
683 tin
= container_of(td_read
, struct ctf_trace
, parent
);
685 /* Populate heap with each stream */
686 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
688 struct ctf_stream_declaration
*stream
;
691 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
694 for (filenr
= 0; filenr
< stream
->streams
->len
;
696 struct ctf_file_stream
*file_stream
;
698 file_stream
= g_ptr_array_index(stream
->streams
,
703 ret
= babeltrace_filestream_seek(
708 struct bt_iter_pos pos
;
709 pos
.type
= BT_SEEK_BEGIN
;
710 ret
= babeltrace_filestream_seek(
721 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
728 ctx
->current_iterator
= iter
;
732 bt_heap_free(iter
->stream_heap
);
734 g_free(iter
->stream_heap
);
735 iter
->stream_heap
= NULL
;
740 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
741 const struct bt_iter_pos
*begin_pos
,
742 const struct bt_iter_pos
*end_pos
)
744 struct bt_iter
*iter
;
750 iter
= g_new0(struct bt_iter
, 1);
751 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
759 void bt_iter_fini(struct bt_iter
*iter
)
762 if (iter
->stream_heap
) {
763 bt_heap_free(iter
->stream_heap
);
764 g_free(iter
->stream_heap
);
766 iter
->ctx
->current_iterator
= NULL
;
767 bt_context_put(iter
->ctx
);
770 void bt_iter_destroy(struct bt_iter
*iter
)
777 int bt_iter_next(struct bt_iter
*iter
)
779 struct ctf_file_stream
*file_stream
, *removed
;
785 file_stream
= bt_heap_maximum(iter
->stream_heap
);
787 /* end of file for all streams */
792 ret
= stream_read_event(file_stream
);
794 removed
= bt_heap_remove(iter
->stream_heap
);
795 assert(removed
== file_stream
);
801 /* Reinsert the file stream into the heap, and rebalance. */
802 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
803 assert(removed
== file_stream
);