6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/context.h>
32 #include <babeltrace/context-internal.h>
33 #include <babeltrace/iterator-internal.h>
34 #include <babeltrace/iterator.h>
35 #include <babeltrace/prio_heap.h>
36 #include <babeltrace/ctf/metadata.h>
37 #include <babeltrace/ctf/events.h>
40 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
41 const struct bt_iter_pos
*begin_pos
,
42 unsigned long stream_id
);
44 struct stream_saved_pos
{
46 * Use file_stream pointer to check if the trace collection we
47 * restore to match the one we saved from, for each stream.
49 struct ctf_file_stream
*file_stream
;
50 size_t cur_index
; /* current index in packet index */
51 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
52 uint64_t current_real_timestamp
;
53 uint64_t current_cycles_timestamp
;
57 struct trace_collection
*tc
;
58 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
61 static int stream_read_event(struct ctf_file_stream
*sin
)
65 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
69 fprintf(stderr
, "[error] Reading event failed.\n");
76 * Return true if a < b, false otherwise.
77 * If time stamps are exactly the same, compare by stream path. This
78 * ensures we get the same result between runs on the same trace
79 * collection on different environments.
80 * The result will be random for memory-mapped traces since there is no
81 * fixed path leading to those (they have empty path string).
83 static int stream_compare(void *a
, void *b
)
85 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
87 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
) {
89 } else if (likely(s_a
->parent
.real_timestamp
> s_b
->parent
.real_timestamp
)) {
92 return strcmp(s_a
->parent
.path
, s_b
->parent
.path
);
96 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
101 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
102 if (iter_pos
->u
.restore
->stream_saved_pos
) {
104 iter_pos
->u
.restore
->stream_saved_pos
,
107 g_free(iter_pos
->u
.restore
);
113 * seek_file_stream_by_timestamp
115 * Browse a filestream by index, if an index contains the timestamp passed in
116 * argument, seek inside the corresponding packet it until we find the event we
117 * are looking for (either the exact timestamp or the event just after the
120 * Return 0 if the seek succeded, EOF if we didn't find any packet
121 * containing the timestamp, or a positive integer for error.
123 * TODO: this should be turned into a binary search! It is currently
124 * doing a linear search in the packets. This is a O(n) operation on a
125 * very frequent code path.
127 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
130 struct ctf_stream_pos
*stream_pos
;
131 struct packet_index
*index
;
134 stream_pos
= &cfs
->pos
;
135 for (i
= 0; i
< stream_pos
->packet_real_index
->len
; i
++) {
136 index
= &g_array_index(stream_pos
->packet_real_index
,
137 struct packet_index
, i
);
138 if (index
->timestamp_end
< timestamp
)
141 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
143 ret
= stream_read_event(cfs
);
144 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
146 /* Can return either EOF, 0, or error (> 0). */
150 * Cannot find the timestamp within the stream packets, return
157 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
158 * the corresponding timestamp
160 * Return 0 on success.
161 * If the timestamp is not part of any file stream, return EOF to inform the
162 * user the timestamp is out of the scope.
163 * On other errors, return positive value.
165 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
166 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
171 /* for each stream_class */
172 for (i
= 0; i
< tin
->streams
->len
; i
++) {
173 struct ctf_stream_declaration
*stream_class
;
175 stream_class
= g_ptr_array_index(tin
->streams
, i
);
178 /* for each file_stream */
179 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
180 struct ctf_stream_definition
*stream
;
181 struct ctf_file_stream
*cfs
;
183 stream
= g_ptr_array_index(stream_class
->streams
, j
);
186 cfs
= container_of(stream
, struct ctf_file_stream
,
188 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
191 ret
= bt_heap_insert(stream_heap
, cfs
);
193 /* Return positive error. */
197 } else if (ret
> 0) {
199 * Error in seek (not EOF), failure.
203 /* on EOF just do not put stream into heap. */
207 return found
? 0 : EOF
;
211 * Find timestamp of last event in the stream.
213 * Return value: 0 if OK, positive error value on error, EOF if no
216 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
217 uint64_t *timestamp_end
)
219 int ret
, count
= 0, i
;
220 uint64_t timestamp
= 0;
221 struct ctf_stream_pos
*stream_pos
;
223 stream_pos
= &cfs
->pos
;
225 * We start by the last packet, and iterate backwards until we
226 * either find at least one event, or we reach the first packet
227 * (some packets can be empty).
229 for (i
= stream_pos
->packet_real_index
->len
- 1; i
>= 0; i
--) {
230 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
232 /* read each event until we reach the end of the stream */
234 ret
= stream_read_event(cfs
);
237 timestamp
= cfs
->parent
.real_timestamp
;
250 *timestamp_end
= timestamp
;
253 /* Return EOF if no events were found */
261 * Find the stream within a stream class that contains the event with
262 * the largest timestamp, and save that timestamp.
264 * Return 0 if OK, EOF if no events were found in the streams, or
265 * positive value on error.
267 static int find_max_timestamp_ctf_stream_class(
268 struct ctf_stream_declaration
*stream_class
,
269 struct ctf_file_stream
**cfsp
,
270 uint64_t *max_timestamp
)
272 int ret
= EOF
, i
, found
= 0;
274 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
275 struct ctf_stream_definition
*stream
;
276 struct ctf_file_stream
*cfs
;
277 uint64_t current_max_ts
= 0;
279 stream
= g_ptr_array_index(stream_class
->streams
, i
);
282 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
283 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
288 if (current_max_ts
>= *max_timestamp
) {
289 *max_timestamp
= current_max_ts
;
294 assert(ret
>= 0 || ret
== EOF
);
302 * seek_last_ctf_trace_collection: seek trace collection to last event.
304 * Return 0 if OK, EOF if no events were found, or positive error value
307 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
308 struct ctf_file_stream
**cfsp
)
312 uint64_t max_timestamp
= 0;
317 /* For each trace in the trace_collection */
318 for (i
= 0; i
< tc
->array
->len
; i
++) {
319 struct ctf_trace
*tin
;
320 struct bt_trace_descriptor
*td_read
;
322 td_read
= g_ptr_array_index(tc
->array
, i
);
325 tin
= container_of(td_read
, struct ctf_trace
, parent
);
326 /* For each stream_class in the trace */
327 for (j
= 0; j
< tin
->streams
->len
; j
++) {
328 struct ctf_stream_declaration
*stream_class
;
330 stream_class
= g_ptr_array_index(tin
->streams
, j
);
333 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
334 cfsp
, &max_timestamp
);
339 assert(ret
== EOF
|| ret
== 0);
343 * Now we know in which file stream the last event is located,
344 * and we know its timestamp.
349 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
356 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
358 struct trace_collection
*tc
;
361 if (!iter
|| !iter_pos
)
364 switch (iter_pos
->type
) {
365 case BT_SEEK_RESTORE
:
366 if (!iter_pos
->u
.restore
)
369 bt_heap_free(iter
->stream_heap
);
370 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
372 goto error_heap_init
;
374 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
376 struct stream_saved_pos
*saved_pos
;
377 struct ctf_stream_pos
*stream_pos
;
378 struct ctf_stream_definition
*stream
;
380 saved_pos
= &g_array_index(
381 iter_pos
->u
.restore
->stream_saved_pos
,
382 struct stream_saved_pos
, i
);
383 stream
= &saved_pos
->file_stream
->parent
;
384 stream_pos
= &saved_pos
->file_stream
->pos
;
386 stream_pos
->packet_seek(&stream_pos
->parent
,
387 saved_pos
->cur_index
, SEEK_SET
);
390 * the timestamp needs to be restored after
391 * packet_seek, because this function resets
392 * the timestamp to the beginning of the packet
394 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
395 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
396 stream_pos
->offset
= saved_pos
->offset
;
397 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
399 stream
->prev_real_timestamp
= 0;
400 stream
->prev_real_timestamp_end
= 0;
401 stream
->prev_cycles_timestamp
= 0;
402 stream
->prev_cycles_timestamp_end
= 0;
404 printf_debug("restored to cur_index = %" PRId64
" and "
405 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
406 stream_pos
->cur_index
,
407 stream_pos
->offset
, stream
->real_timestamp
);
409 ret
= stream_read_event(saved_pos
->file_stream
);
415 ret
= bt_heap_insert(iter
->stream_heap
,
416 saved_pos
->file_stream
);
424 bt_heap_free(iter
->stream_heap
);
425 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
427 goto error_heap_init
;
429 /* for each trace in the trace_collection */
430 for (i
= 0; i
< tc
->array
->len
; i
++) {
431 struct ctf_trace
*tin
;
432 struct bt_trace_descriptor
*td_read
;
434 td_read
= g_ptr_array_index(tc
->array
, i
);
437 tin
= container_of(td_read
, struct ctf_trace
, parent
);
439 ret
= seek_ctf_trace_by_timestamp(tin
,
440 iter_pos
->u
.seek_time
,
443 * Positive errors are failure. Negative value
444 * is EOF (for which we continue with other
445 * traces). 0 is success. Note: on EOF, it just
446 * means that no stream has been added to the
447 * iterator for that trace, which is fine.
449 if (ret
!= 0 && ret
!= EOF
)
455 bt_heap_free(iter
->stream_heap
);
456 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
458 goto error_heap_init
;
460 for (i
= 0; i
< tc
->array
->len
; i
++) {
461 struct ctf_trace
*tin
;
462 struct bt_trace_descriptor
*td_read
;
465 td_read
= g_ptr_array_index(tc
->array
, i
);
468 tin
= container_of(td_read
, struct ctf_trace
, parent
);
470 /* Populate heap with each stream */
471 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
473 struct ctf_stream_declaration
*stream
;
476 stream
= g_ptr_array_index(tin
->streams
,
480 for (filenr
= 0; filenr
< stream
->streams
->len
;
482 struct ctf_file_stream
*file_stream
;
483 file_stream
= g_ptr_array_index(
488 ret
= babeltrace_filestream_seek(
489 file_stream
, iter_pos
,
491 if (ret
!= 0 && ret
!= EOF
) {
495 /* Do not add EOF streams */
498 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
507 struct ctf_file_stream
*cfs
= NULL
;
510 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
511 if (ret
!= 0 || !cfs
)
513 /* remove all streams from the heap */
514 bt_heap_free(iter
->stream_heap
);
515 /* Create a new empty heap */
516 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
519 /* Insert the stream that contains the last event */
520 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
526 /* not implemented */
533 bt_heap_free(iter
->stream_heap
);
535 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
536 bt_heap_free(iter
->stream_heap
);
537 g_free(iter
->stream_heap
);
538 iter
->stream_heap
= NULL
;
545 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
547 struct bt_iter_pos
*pos
;
548 struct trace_collection
*tc
;
549 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
550 struct ptr_heap iter_heap_copy
;
557 pos
= g_new0(struct bt_iter_pos
, 1);
558 pos
->type
= BT_SEEK_RESTORE
;
559 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
560 pos
->u
.restore
->tc
= tc
;
561 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
562 sizeof(struct stream_saved_pos
));
563 if (!pos
->u
.restore
->stream_saved_pos
)
566 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
570 /* iterate over each stream in the heap */
571 file_stream
= bt_heap_maximum(&iter_heap_copy
);
572 while (file_stream
!= NULL
) {
573 struct stream_saved_pos saved_pos
;
575 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
576 saved_pos
.offset
= file_stream
->pos
.last_offset
;
577 saved_pos
.file_stream
= file_stream
;
578 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
580 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
581 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
584 pos
->u
.restore
->stream_saved_pos
,
587 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
589 "timestamp = %" PRIu64
"\n",
590 file_stream
->parent
.stream_id
,
591 saved_pos
.cur_index
, saved_pos
.offset
,
592 saved_pos
.current_real_timestamp
);
594 /* remove the stream from the heap copy */
595 removed
= bt_heap_remove(&iter_heap_copy
);
596 assert(removed
== file_stream
);
598 file_stream
= bt_heap_maximum(&iter_heap_copy
);
600 bt_heap_free(&iter_heap_copy
);
604 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
610 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*iter
,
613 struct bt_iter_pos
*pos
;
618 pos
= g_new0(struct bt_iter_pos
, 1);
619 pos
->type
= BT_SEEK_TIME
;
620 pos
->u
.seek_time
= timestamp
;
625 * babeltrace_filestream_seek: seek a filestream to given position.
627 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
629 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
630 const struct bt_iter_pos
*begin_pos
,
631 unsigned long stream_id
)
635 if (!file_stream
|| !begin_pos
)
638 switch (begin_pos
->type
) {
641 * just insert into the heap we should already know
646 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
648 ret
= stream_read_event(file_stream
);
651 case BT_SEEK_RESTORE
:
653 assert(0); /* Not yet defined */
659 int bt_iter_init(struct bt_iter
*iter
,
660 struct bt_context
*ctx
,
661 const struct bt_iter_pos
*begin_pos
,
662 const struct bt_iter_pos
*end_pos
)
670 if (ctx
->current_iterator
) {
675 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
676 iter
->end_pos
= end_pos
;
680 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
682 goto error_heap_init
;
684 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
685 struct ctf_trace
*tin
;
686 struct bt_trace_descriptor
*td_read
;
688 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
691 tin
= container_of(td_read
, struct ctf_trace
, parent
);
693 /* Populate heap with each stream */
694 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
696 struct ctf_stream_declaration
*stream
;
699 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
702 for (filenr
= 0; filenr
< stream
->streams
->len
;
704 struct ctf_file_stream
*file_stream
;
706 file_stream
= g_ptr_array_index(stream
->streams
,
711 ret
= babeltrace_filestream_seek(
716 struct bt_iter_pos pos
;
717 pos
.type
= BT_SEEK_BEGIN
;
718 ret
= babeltrace_filestream_seek(
729 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
736 ctx
->current_iterator
= iter
;
740 bt_heap_free(iter
->stream_heap
);
742 g_free(iter
->stream_heap
);
743 iter
->stream_heap
= NULL
;
748 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
749 const struct bt_iter_pos
*begin_pos
,
750 const struct bt_iter_pos
*end_pos
)
752 struct bt_iter
*iter
;
758 iter
= g_new0(struct bt_iter
, 1);
759 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
767 void bt_iter_fini(struct bt_iter
*iter
)
770 if (iter
->stream_heap
) {
771 bt_heap_free(iter
->stream_heap
);
772 g_free(iter
->stream_heap
);
774 iter
->ctx
->current_iterator
= NULL
;
775 bt_context_put(iter
->ctx
);
778 void bt_iter_destroy(struct bt_iter
*iter
)
785 int bt_iter_next(struct bt_iter
*iter
)
787 struct ctf_file_stream
*file_stream
, *removed
;
793 file_stream
= bt_heap_maximum(iter
->stream_heap
);
795 /* end of file for all streams */
800 ret
= stream_read_event(file_stream
);
802 removed
= bt_heap_remove(iter
->stream_heap
);
803 assert(removed
== file_stream
);
809 /* Reinsert the file stream into the heap, and rebalance. */
810 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
811 assert(removed
== file_stream
);