X-Git-Url: http://drtracing.org/?a=blobdiff_plain;f=plugins%2Fctf%2Fcommon%2Fmsg-iter%2Fmsg-iter.c;h=0a3b3861da4af0861fa5213fbaa8b028a700f10f;hb=93875071881517bbcf7d479b7c955297906f87d0;hp=8bed03784ce3503c25c61627066efda7c79219a3;hpb=605e1019783967f33d86967e7c98dd52cbd69a4c;p=babeltrace.git diff --git a/plugins/ctf/common/msg-iter/msg-iter.c b/plugins/ctf/common/msg-iter/msg-iter.c index 8bed0378..0a3b3861 100644 --- a/plugins/ctf/common/msg-iter/msg-iter.c +++ b/plugins/ctf/common/msg-iter/msg-iter.c @@ -80,8 +80,14 @@ enum state { STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN, STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE, STATE_AFTER_STREAM_PACKET_CONTEXT, - STATE_EMIT_MSG_NEW_STREAM, - STATE_EMIT_MSG_NEW_PACKET, + STATE_CHECK_EMIT_MSG_STREAM_BEGINNING, + STATE_EMIT_MSG_STREAM_BEGINNING, + STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING, + STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS, + STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS, + STATE_EMIT_MSG_DISCARDED_EVENTS, + STATE_EMIT_MSG_DISCARDED_PACKETS, + STATE_EMIT_MSG_PACKET_BEGINNING, STATE_DSCOPE_EVENT_HEADER_BEGIN, STATE_DSCOPE_EVENT_HEADER_CONTINUE, STATE_AFTER_EVENT_HEADER, @@ -92,9 +98,20 @@ enum state { STATE_DSCOPE_EVENT_PAYLOAD_BEGIN, STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE, STATE_EMIT_MSG_EVENT, - STATE_EMIT_MSG_END_OF_PACKET, - STATE_DONE, STATE_SKIP_PACKET_PADDING, + STATE_EMIT_MSG_PACKET_END_MULTI, + STATE_EMIT_MSG_PACKET_END_SINGLE, + STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END, + STATE_EMIT_MSG_STREAM_ACTIVITY_END, + STATE_EMIT_MSG_STREAM_END, + STATE_DONE, +}; + +struct end_of_packet_snapshots { + uint64_t discarded_events; + uint64_t packets; + uint64_t beginning_clock; + uint64_t end_clock; }; /* CTF message iterator */ @@ -105,6 +122,18 @@ struct bt_msg_iter { /* Current message iterator to create messages (weak) */ bt_self_message_iterator *msg_iter; + /* + * True to emit stream beginning and stream activity beginning + * messages. + */ + bool emit_stream_begin_msg; + + /* True to emit stream end and stream activity end messages */ + bool emit_stream_end_msg; + + /* True to set the stream */ + bool set_stream; + /* * Current dynamic scope field pointer. * @@ -120,21 +149,18 @@ struct bt_msg_iter { bool done_filling_string; /* Trace and classes */ + /* True to set IR fields */ + bool set_ir_fields; + struct { struct ctf_trace_class *tc; struct ctf_stream_class *sc; struct ctf_event_class *ec; } meta; - /* Current packet header field wrapper (NULL if not created yet) */ - bt_packet_header_field *packet_header_field; - - /* Current packet header field wrapper (NULL if not created yet) */ + /* Current packet context field wrapper (NULL if not created yet) */ bt_packet_context_field *packet_context_field; - /* Current event header field (NULL if not created yet) */ - bt_event_header_field *event_header_field; - /* Current packet (NULL if not created yet) */ bt_packet *packet; @@ -149,9 +175,7 @@ struct bt_msg_iter { /* Database of current dynamic scopes */ struct { - bt_field *trace_packet_header; bt_field *stream_packet_context; - bt_field *event_header; bt_field *event_common_context; bt_field *event_spec_context; bt_field *event_payload; @@ -188,9 +212,6 @@ struct bt_msg_iter { void *data; } medium; - /* Stream beginning was emitted */ - bool stream_begin_emitted; - /* Current packet size (bits) (-1 if unknown) */ int64_t cur_exp_packet_total_size; @@ -215,13 +236,11 @@ struct bt_msg_iter { /* Default clock's current value */ uint64_t default_clock_snapshot; - /* End of packet snapshots */ - struct { - uint64_t discarded_events; - uint64_t packets; - uint64_t beginning_clock; - uint64_t end_clock; - } snapshots; + /* End of current packet snapshots */ + struct end_of_packet_snapshots snapshots; + + /* End of previous packet snapshots */ + struct end_of_packet_snapshots prev_packet_snapshots; /* Stored values (for sequence lengths, variant tags) */ GArray *stored_values; @@ -245,10 +264,16 @@ const char *state_string(enum state state) return "STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE"; case STATE_AFTER_STREAM_PACKET_CONTEXT: return "STATE_AFTER_STREAM_PACKET_CONTEXT"; - case STATE_EMIT_MSG_NEW_PACKET: - return "STATE_EMIT_MSG_NEW_PACKET"; - case STATE_EMIT_MSG_NEW_STREAM: - return "STATE_EMIT_MSG_NEW_STREAM"; + case STATE_EMIT_MSG_STREAM_BEGINNING: + return "STATE_EMIT_MSG_STREAM_BEGINNING"; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING"; + case STATE_EMIT_MSG_PACKET_BEGINNING: + return "STATE_EMIT_MSG_PACKET_BEGINNING"; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + return "STATE_EMIT_MSG_DISCARDED_EVENTS"; + case STATE_EMIT_MSG_DISCARDED_PACKETS: + return "STATE_EMIT_MSG_DISCARDED_PACKETS"; case STATE_DSCOPE_EVENT_HEADER_BEGIN: return "STATE_DSCOPE_EVENT_HEADER_BEGIN"; case STATE_DSCOPE_EVENT_HEADER_CONTINUE: @@ -269,12 +294,18 @@ const char *state_string(enum state state) return "STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE"; case STATE_EMIT_MSG_EVENT: return "STATE_EMIT_MSG_EVENT"; - case STATE_EMIT_MSG_END_OF_PACKET: - return "STATE_EMIT_MSG_END_OF_PACKET"; - case STATE_DONE: - return "STATE_DONE"; case STATE_SKIP_PACKET_PADDING: return "STATE_SKIP_PACKET_PADDING"; + case STATE_EMIT_MSG_PACKET_END_MULTI: + return "STATE_EMIT_MSG_PACKET_END_MULTI"; + case STATE_EMIT_MSG_PACKET_END_SINGLE: + return "STATE_EMIT_MSG_PACKET_END_SINGLE"; + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + return "STATE_EMIT_MSG_STREAM_ACTIVITY_END"; + case STATE_EMIT_MSG_STREAM_END: + return "STATE_EMIT_MSG_STREAM_END"; + case STATE_DONE: + return "STATE_DONE"; default: return "(unknown)"; } @@ -616,13 +647,6 @@ end: static void release_event_dscopes(struct bt_msg_iter *notit) { - notit->dscopes.event_header = NULL; - - if (notit->event_header_field) { - bt_event_header_field_release(notit->event_header_field); - notit->event_header_field = NULL; - } - notit->dscopes.event_common_context = NULL; notit->dscopes.event_spec_context = NULL; notit->dscopes.event_payload = NULL; @@ -631,13 +655,6 @@ void release_event_dscopes(struct bt_msg_iter *notit) static void release_all_dscopes(struct bt_msg_iter *notit) { - notit->dscopes.trace_packet_header = NULL; - - if (notit->packet_header_field) { - bt_packet_header_field_release(notit->packet_header_field); - notit->packet_header_field = NULL; - } - notit->dscopes.stream_packet_context = NULL; if (notit->packet_context_field) { @@ -661,6 +678,23 @@ enum bt_msg_iter_status read_packet_header_begin_state( goto end; } + /* + * Make sure at least one bit is available for this packet. An + * empty packet is impossible. If we reach the end of the medium + * at this point, then it's considered the end of the stream. + */ + ret = buf_ensure_available_bits(notit); + switch (ret) { + case BT_MSG_ITER_STATUS_OK: + break; + case BT_MSG_ITER_STATUS_EOF: + ret = BT_MSG_ITER_STATUS_OK; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; + goto end; + default: + goto end; + } + /* Packet header class is common to the whole trace class. */ packet_header_fc = notit->meta.tc->packet_header_fc; if (!packet_header_fc) { @@ -668,34 +702,6 @@ enum bt_msg_iter_status read_packet_header_begin_state( goto end; } - BT_ASSERT(!notit->packet_header_field); - - if (packet_header_fc->in_ir) { - /* - * Create free packet header field from trace class. - * This field is going to be moved to the packet once we - * create it. We cannot create the packet now because: - * - * 1. A packet is created from a stream. - * 2. A stream is created from a stream class. - * 3. We need the packet header field's content to know - * the ID of the stream class to select. - */ - notit->packet_header_field = - bt_packet_header_field_create( - notit->meta.tc->ir_tc); - if (!notit->packet_header_field) { - BT_LOGE_STR("Cannot create packet header field wrapper from trace class."); - ret = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - - notit->dscopes.trace_packet_header = - bt_packet_header_field_borrow_field( - notit->packet_header_field); - BT_ASSERT(notit->dscopes.trace_packet_header); - } - notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; notit->cur_data_stream_id = -1; @@ -704,8 +710,7 @@ enum bt_msg_iter_status read_packet_header_begin_state( notit, notit->meta.tc, packet_header_fc); ret = read_dscope_begin_state(notit, packet_header_fc, STATE_AFTER_TRACE_PACKET_HEADER, - STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE, - notit->dscopes.trace_packet_header); + STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE, NULL); if (ret < 0) { BT_LOGW("Cannot decode packet header field: " "notit-addr=%p, trace-class-addr=%p, " @@ -907,9 +912,9 @@ enum bt_msg_iter_status read_packet_context_begin_state( * This field is going to be moved to the packet once we * create it. We cannot create the packet now because a * packet is created from a stream, and this API must be - * able to return the packet header and context fields - * without creating a stream - * (bt_msg_iter_borrow_packet_header_context_fields()). + * able to return the packet context properties without + * creating a stream + * (bt_msg_iter_get_packet_properties()). */ notit->packet_context_field = bt_packet_context_field_create( @@ -997,13 +1002,13 @@ enum bt_msg_iter_status set_current_packet_content_sizes( "notit-addr=%p, packet-size=%" PRIu64 ", content-size=%" PRIu64, notit, notit->cur_exp_packet_total_size, notit->cur_exp_packet_content_size); + end: return status; } static -enum bt_msg_iter_status after_packet_context_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status; @@ -1012,10 +1017,15 @@ enum bt_msg_iter_status after_packet_context_state( goto end; } - if (notit->stream_begin_emitted) { - notit->state = STATE_EMIT_MSG_NEW_PACKET; + if (notit->stream) { + /* + * Stream exists, which means we already emitted at + * least one packet beginning message, so the initial + * stream beginning message was also emitted. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; } else { - notit->state = STATE_EMIT_MSG_NEW_STREAM; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING; } end: @@ -1023,8 +1033,7 @@ end: } static -enum bt_msg_iter_status read_event_header_begin_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status read_event_header_begin_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; struct ctf_field_class *event_header_fc = NULL; @@ -1040,7 +1049,7 @@ enum bt_msg_iter_status read_event_header_begin_state( /* No more events! */ BT_LOGV("Reached end of packet: notit-addr=%p, " "cur=%zu", notit, packet_at(notit)); - notit->state = STATE_EMIT_MSG_END_OF_PACKET; + notit->state = STATE_EMIT_MSG_PACKET_END_MULTI; goto end; } else if (unlikely(packet_at(notit) > notit->cur_exp_packet_content_size)) { @@ -1059,22 +1068,14 @@ enum bt_msg_iter_status read_event_header_begin_state( * nothing else for us. */ status = buf_ensure_available_bits(notit); - if (status != BT_MSG_ITER_STATUS_OK) { - /* - * If this function returns - * `BT_MSG_ITER_STATUS_EOF`: - * - * 1. bt_msg_iter_get_next_message() - * emits a "packet end" message. This - * resets the current packet. The state - * remains unchanged otherwise. - * 2. This function is called again. It returns - * `BT_MSG_ITER_STATUS_EOF` again. - * 3. bt_msg_iter_get_next_message() - * emits a "stream end" message because - * there's no current packet. It sets the - * current state to `STATE_DONE`. - */ + switch (status) { + case BT_MSG_ITER_STATUS_OK: + break; + case BT_MSG_ITER_STATUS_EOF: + status = BT_MSG_ITER_STATUS_OK; + notit->state = STATE_EMIT_MSG_PACKET_END_SINGLE; + goto end; + default: goto end; } } @@ -1087,23 +1088,6 @@ enum bt_msg_iter_status read_event_header_begin_state( goto end; } - if (event_header_fc->in_ir) { - BT_ASSERT(!notit->event_header_field); - notit->event_header_field = - bt_event_header_field_create( - notit->meta.sc->ir_sc); - if (!notit->event_header_field) { - BT_LOGE_STR("Cannot create event header field wrapper from trace class."); - status = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - - notit->dscopes.event_header = - bt_event_header_field_borrow_field( - notit->event_header_field); - BT_ASSERT(notit->dscopes.event_header); - } - BT_LOGV("Decoding event header field: " "notit-addr=%p, stream-class-addr=%p, " "stream-class-id=%" PRId64 ", " @@ -1113,8 +1097,7 @@ enum bt_msg_iter_status read_event_header_begin_state( event_header_fc); status = read_dscope_begin_state(notit, event_header_fc, STATE_AFTER_EVENT_HEADER, - STATE_DSCOPE_EVENT_HEADER_CONTINUE, - notit->dscopes.event_header); + STATE_DSCOPE_EVENT_HEADER_CONTINUE, NULL); if (status < 0) { BT_LOGW("Cannot decode event header field: " "notit-addr=%p, stream-class-addr=%p, " @@ -1242,29 +1225,6 @@ enum bt_msg_iter_status after_event_header_state( notit->event = bt_message_event_borrow_event( notit->event_msg); BT_ASSERT(notit->event); - - if (notit->event_header_field) { - int ret; - - BT_ASSERT(notit->event); - ret = bt_event_move_header_field(notit->event, - notit->event_header_field); - if (ret) { - status = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - - notit->event_header_field = NULL; - - /* - * At this point notit->dscopes.event_header has - * the same value as the event header field within - * notit->event. - */ - BT_ASSERT(bt_event_borrow_header_field( - notit->event) == notit->dscopes.event_header); - } - notit->state = STATE_DSCOPE_EVENT_COMMON_CONTEXT_BEGIN; end: @@ -1436,8 +1396,7 @@ enum bt_msg_iter_status read_event_payload_continue_state( } static -enum bt_msg_iter_status skip_packet_padding_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; size_t bits_to_skip; @@ -1473,6 +1432,115 @@ end: return status; } +static +enum bt_msg_iter_status check_emit_msg_stream_beginning_state( + struct bt_msg_iter *notit) +{ + enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; + + if (notit->set_stream) { + status = set_current_stream(notit); + if (status != BT_MSG_ITER_STATUS_OK) { + goto end; + } + } + + if (notit->emit_stream_begin_msg) { + notit->state = STATE_EMIT_MSG_STREAM_BEGINNING; + } else { + /* Stream's first packet */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; + } + +end: + return status; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_events( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_EVENTS; + + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + if (notit->snapshots.discarded_events == 0 || + notit->snapshots.discarded_events == UINT64_C(-1)) { + /* + * Stream's first packet with no discarded + * events or no information about discarded + * events: do not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.discarded_events != UINT64_C(-1)); + + if (notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events == 0) { + /* + * No discarded events since previous packet: do + * not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } + + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_packets( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_PACKETS; + + if (notit->prev_packet_snapshots.packets == UINT64_C(-1)) { + /* + * Stream's first packet or no information about + * discarded packets: do not emit. In other words, if + * this is the first packet and its sequence number is + * not 0, do not consider that packets were previously + * lost: we might be reading a partial stream (LTTng + * snapshot for example). + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.packets != UINT64_C(-1)); + + if (notit->snapshots.packets - + notit->prev_packet_snapshots.packets <= 1) { + /* + * No discarded packets since previous packet: + * do not emit. + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } + } + + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_stream_activity_end( + struct bt_msg_iter *notit) +{ + if (notit->emit_stream_end_msg) { + notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END; + } else { + notit->state = STATE_DONE; + } + + return BT_MSG_ITER_STATUS_OK; +} + static inline enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) { @@ -1505,10 +1573,28 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) case STATE_AFTER_STREAM_PACKET_CONTEXT: status = after_packet_context_state(notit); break; - case STATE_EMIT_MSG_NEW_STREAM: - notit->state = STATE_EMIT_MSG_NEW_PACKET; + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: + status = check_emit_msg_stream_beginning_state(notit); + break; + case STATE_EMIT_MSG_STREAM_BEGINNING: + notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING; + break; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; break; - case STATE_EMIT_MSG_NEW_PACKET: + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + status = check_emit_msg_discarded_events(notit); + break; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + break; + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + status = check_emit_msg_discarded_packets(notit); + break; + case STATE_EMIT_MSG_DISCARDED_PACKETS: + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + break; + case STATE_EMIT_MSG_PACKET_BEGINNING: notit->state = STATE_DSCOPE_EVENT_HEADER_BEGIN; break; case STATE_DSCOPE_EVENT_HEADER_BEGIN: @@ -1544,9 +1630,23 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) case STATE_SKIP_PACKET_PADDING: status = skip_packet_padding_state(notit); break; - case STATE_EMIT_MSG_END_OF_PACKET: + case STATE_EMIT_MSG_PACKET_END_MULTI: notit->state = STATE_SKIP_PACKET_PADDING; break; + case STATE_EMIT_MSG_PACKET_END_SINGLE: + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; + break; + case STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END: + status = check_emit_msg_stream_activity_end(notit); + break; + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + notit->state = STATE_EMIT_MSG_STREAM_END; + break; + case STATE_EMIT_MSG_STREAM_END: + notit->state = STATE_DONE; + break; + case STATE_DONE: + break; default: BT_LOGD("Unknown CTF plugin message iterator state: " "notit-addr=%p, state=%d", notit, notit->state); @@ -1560,11 +1660,8 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) return status; } -/** - * Resets the internal state of a CTF message iterator. - */ BT_HIDDEN -void bt_msg_iter_reset(struct bt_msg_iter *notit) +void bt_msg_iter_reset_for_next_stream_file(struct bt_msg_iter *notit) { BT_ASSERT(notit); BT_LOGD("Resetting message iterator: addr=%p", notit); @@ -1577,21 +1674,11 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit) release_all_dscopes(notit); notit->cur_dscope_field = NULL; - if (notit->packet_header_field) { - bt_packet_header_field_release(notit->packet_header_field); - notit->packet_header_field = NULL; - } - if (notit->packet_context_field) { bt_packet_context_field_release(notit->packet_context_field); notit->packet_context_field = NULL; } - if (notit->event_header_field) { - bt_event_header_field_release(notit->event_header_field); - notit->event_header_field = NULL; - } - notit->buf.addr = NULL; notit->buf.sz = 0; notit->buf.at = 0; @@ -1601,10 +1688,28 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit) notit->cur_exp_packet_content_size = -1; notit->cur_exp_packet_total_size = -1; notit->cur_packet_offset = -1; - notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; + notit->snapshots.beginning_clock = UINT64_C(-1); + notit->snapshots.end_clock = UINT64_C(-1); +} + +/** + * Resets the internal state of a CTF message iterator. + */ +BT_HIDDEN +void bt_msg_iter_reset(struct bt_msg_iter *notit) +{ + bt_msg_iter_reset_for_next_stream_file(notit); + notit->cur_stream_class_id = -1; notit->cur_data_stream_id = -1; - notit->stream_begin_emitted = false; + notit->emit_stream_begin_msg = true; + notit->emit_stream_end_msg = true; + notit->snapshots.discarded_events = UINT64_C(-1); + notit->snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.discarded_events = UINT64_C(-1); + notit->prev_packet_snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.beginning_clock = UINT64_C(-1); + notit->prev_packet_snapshots.end_clock = UINT64_C(-1); } static @@ -1661,6 +1766,7 @@ int bt_msg_iter_switch_packet(struct bt_msg_iter *notit) notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; notit->cur_data_stream_id = -1; + notit->prev_packet_snapshots = notit->snapshots; notit->snapshots.discarded_events = UINT64_C(-1); notit->snapshots.packets = UINT64_C(-1); notit->snapshots.beginning_clock = UINT64_C(-1); @@ -1946,7 +2052,11 @@ enum bt_bfcr_status bfcr_floating_point_cb(double value, "notit-addr=%p, bfcr-addr=%p, fc-addr=%p, " "fc-type=%d, fc-in-ir=%d, value=%f", notit, notit->bfcr, fc, fc->type, fc->in_ir, value); - BT_ASSERT(fc->in_ir); + + if (unlikely(!fc->in_ir)) { + goto end; + } + field = borrow_next_field(notit); BT_ASSERT(field); BT_ASSERT(bt_field_borrow_class_const(field) == fc->ir_fc); @@ -1954,6 +2064,8 @@ enum bt_bfcr_status bfcr_floating_point_cb(double value, BT_FIELD_CLASS_TYPE_REAL); bt_field_real_set_value(field, value); stack_top(notit->stack)->index++; + +end: return status; } @@ -1970,7 +2082,10 @@ enum bt_bfcr_status bfcr_string_begin_cb( "fc-type=%d, fc-in-ir=%d", notit, notit->bfcr, fc, fc->type, fc->in_ir); - BT_ASSERT(fc->in_ir); + if (unlikely(!fc->in_ir)) { + goto end; + } + field = borrow_next_field(notit); BT_ASSERT(field); BT_ASSERT(bt_field_borrow_class_const(field) == fc->ir_fc); @@ -1985,6 +2100,8 @@ enum bt_bfcr_status bfcr_string_begin_cb( * subsequent call to bfcr_string_end_cb(). */ stack_push(notit->stack, field); + +end: return BT_BFCR_STATUS_OK; } @@ -2002,7 +2119,11 @@ enum bt_bfcr_status bfcr_string_cb(const char *value, "fc-type=%d, fc-in-ir=%d, string-length=%zu", notit, notit->bfcr, fc, fc->type, fc->in_ir, len); - BT_ASSERT(fc->in_ir); + + if (unlikely(!fc->in_ir)) { + goto end; + } + field = stack_top(notit->stack)->base; BT_ASSERT(field); @@ -2030,13 +2151,18 @@ enum bt_bfcr_status bfcr_string_end_cb( "notit-addr=%p, bfcr-addr=%p, fc-addr=%p, " "fc-type=%d, fc-in-ir=%d", notit, notit->bfcr, fc, fc->type, fc->in_ir); - BT_ASSERT(fc->in_ir); + + if (unlikely(!fc->in_ir)) { + goto end; + } /* Pop string field */ stack_pop(notit->stack); /* Go to next field */ stack_top(notit->stack)->index++; + +end: return BT_BFCR_STATUS_OK; } @@ -2246,9 +2372,7 @@ end: static void set_event_default_clock_snapshot(struct bt_msg_iter *notit) { - bt_event *event = - bt_message_event_borrow_event( - notit->event_msg); + bt_event *event = bt_message_event_borrow_event(notit->event_msg); bt_stream_class *sc = notit->meta.sc->ir_sc; BT_ASSERT(event); @@ -2260,18 +2384,11 @@ void set_event_default_clock_snapshot(struct bt_msg_iter *notit) } static -void notify_new_stream(struct bt_msg_iter *notit, +void create_msg_stream_beginning(struct bt_msg_iter *notit, bt_message **message) { - enum bt_msg_iter_status status; bt_message *ret = NULL; - status = set_current_stream(notit); - if (status != BT_MSG_ITER_STATUS_OK) { - BT_MESSAGE_PUT_REF_AND_RESET(ret); - goto end; - } - BT_ASSERT(notit->stream); BT_ASSERT(notit->msg_iter); ret = bt_message_stream_beginning_create(notit->msg_iter, @@ -2283,13 +2400,57 @@ void notify_new_stream(struct bt_msg_iter *notit, return; } -end: *message = ret; } static -void notify_end_of_stream(struct bt_msg_iter *notit, +void create_msg_stream_activity_beginning(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *ret = NULL; + + BT_ASSERT(notit->stream); + BT_ASSERT(notit->msg_iter); + ret = bt_message_stream_activity_beginning_create(notit->msg_iter, + notit->stream); + if (!ret) { + BT_LOGE("Cannot create stream activity beginning message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + *message = ret; +} + +static +void create_msg_stream_activity_end(struct bt_msg_iter *notit, bt_message **message) +{ + bt_message *ret = NULL; + + if (!notit->stream) { + BT_LOGE("Cannot create stream for stream message: " + "notit-addr=%p", notit); + return; + } + + BT_ASSERT(notit->stream); + BT_ASSERT(notit->msg_iter); + ret = bt_message_stream_activity_end_create(notit->msg_iter, + notit->stream); + if (!ret) { + BT_LOGE("Cannot create stream activity end message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + *message = ret; +} + +static +void create_msg_stream_end(struct bt_msg_iter *notit, bt_message **message) { bt_message *ret; @@ -2303,16 +2464,17 @@ void notify_end_of_stream(struct bt_msg_iter *notit, ret = bt_message_stream_end_create(notit->msg_iter, notit->stream); if (!ret) { - BT_LOGE("Cannot create stream beginning message: " + BT_LOGE("Cannot create stream end message: " "notit-addr=%p, stream-addr=%p", notit, notit->stream); return; } + *message = ret; } static -void notify_new_packet(struct bt_msg_iter *notit, +void create_msg_packet_beginning(struct bt_msg_iter *notit, bt_message **message) { int ret; @@ -2329,49 +2491,6 @@ void notify_new_packet(struct bt_msg_iter *notit, sc = notit->meta.sc->ir_sc; BT_ASSERT(sc); - if (bt_stream_class_packets_have_discarded_event_counter_snapshot(sc)) { - BT_ASSERT(notit->snapshots.discarded_events != UINT64_C(-1)); - bt_packet_set_discarded_event_counter_snapshot( - notit->packet, notit->snapshots.discarded_events); - } - - if (bt_stream_class_packets_have_packet_counter_snapshot(sc)) { - BT_ASSERT(notit->snapshots.packets != UINT64_C(-1)); - bt_packet_set_packet_counter_snapshot( - notit->packet, notit->snapshots.packets); - } - - if (bt_stream_class_packets_have_default_beginning_clock_snapshot(sc)) { - BT_ASSERT(notit->snapshots.beginning_clock != UINT64_C(-1)); - bt_packet_set_default_beginning_clock_snapshot( - notit->packet, notit->snapshots.beginning_clock); - } - - if (bt_stream_class_packets_have_default_end_clock_snapshot(sc)) { - BT_ASSERT(notit->snapshots.end_clock != UINT64_C(-1)); - bt_packet_set_default_end_clock_snapshot( - notit->packet, notit->snapshots.end_clock); - } - - if (notit->packet_header_field) { - ret = bt_packet_move_header_field( - notit->packet, notit->packet_header_field); - if (ret) { - goto end; - } - - notit->packet_header_field = NULL; - - /* - * At this point notit->dscopes.trace_packet_header has - * the same value as the packet header field within - * notit->packet. - */ - BT_ASSERT(bt_packet_borrow_header_field( - notit->packet) == - notit->dscopes.trace_packet_header); - } - if (notit->packet_context_field) { ret = bt_packet_move_context_field( notit->packet, notit->packet_context_field); @@ -2382,8 +2501,8 @@ void notify_new_packet(struct bt_msg_iter *notit, notit->packet_context_field = NULL; /* - * At this point notit->dscopes.trace_packet_header has - * the same value as the packet header field within + * At this point notit->dscopes.stream_packet_context + * has the same value as the packet context field within * notit->packet. */ BT_ASSERT(bt_packet_borrow_context_field( @@ -2408,8 +2527,7 @@ end: } static -void notify_end_of_packet(struct bt_msg_iter *notit, - bt_message **message) +void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message) { bt_message *msg; @@ -2437,6 +2555,93 @@ void notify_end_of_packet(struct bt_msg_iter *notit, *message = msg; } +static +void create_msg_discarded_events(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + uint64_t beginning_raw_value = UINT64_C(-1); + uint64_t end_raw_value = UINT64_C(-1); + uint64_t count = UINT64_C(-1); + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + /* + * We discarded events, but before (and possibly + * including) the current packet: use this packet's time + * range, and do not have a specific count. + */ + beginning_raw_value = notit->snapshots.beginning_clock; + end_raw_value = notit->snapshots.end_clock; + } else { + count = notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events; + BT_ASSERT(count > 0); + beginning_raw_value = notit->prev_packet_snapshots.end_clock; + end_raw_value = notit->snapshots.end_clock; + } + + if (beginning_raw_value != UINT64_C(-1) && + end_raw_value != UINT64_C(-1)) { + msg = bt_message_discarded_events_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, beginning_raw_value, + end_raw_value); + } else { + msg = bt_message_discarded_events_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded events message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + if (count != UINT64_C(-1)) { + bt_message_discarded_events_set_count(msg, count); + } + + *message = msg; +} + +static +void create_msg_discarded_packets(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + BT_ASSERT(notit->prev_packet_snapshots.packets != + UINT64_C(-1)); + + if (notit->prev_packet_snapshots.end_clock != UINT64_C(-1) && + notit->snapshots.beginning_clock != UINT64_C(-1)) { + msg = bt_message_discarded_packets_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, + notit->prev_packet_snapshots.end_clock, + notit->snapshots.beginning_clock); + } else { + msg = bt_message_discarded_packets_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded packets message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + bt_message_discarded_packets_set_count(msg, + notit->snapshots.packets - + notit->prev_packet_snapshots.packets - 1); + *message = msg; +} + BT_HIDDEN struct bt_msg_iter *bt_msg_iter_create(struct ctf_trace_class *tc, size_t max_request_sz, @@ -2533,95 +2738,109 @@ void bt_msg_iter_destroy(struct bt_msg_iter *notit) enum bt_msg_iter_status bt_msg_iter_get_next_message( struct bt_msg_iter *notit, - bt_self_message_iterator *msg_iter, - bt_message **message) + bt_self_message_iterator *msg_iter, bt_message **message) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; BT_ASSERT(notit); BT_ASSERT(message); - - if (notit->state == STATE_DONE) { - status = BT_MSG_ITER_STATUS_EOF; - goto end; - } - notit->msg_iter = msg_iter; - + notit->set_stream = true; BT_LOGV("Getting next message: notit-addr=%p", notit); while (true) { status = handle_state(notit); - if (status == BT_MSG_ITER_STATUS_AGAIN) { + if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) { BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN."); goto end; + } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) { + BT_LOGW("Cannot handle state: notit-addr=%p, state=%s", + notit, state_string(notit->state)); + goto end; } - if (status != BT_MSG_ITER_STATUS_OK) { - if (status == BT_MSG_ITER_STATUS_EOF) { - enum state next_state = notit->state; - - BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF."); - - if (notit->packet) { - notify_end_of_packet(notit, - message); - } else { - notify_end_of_stream(notit, - message); - next_state = STATE_DONE; - } - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - - status = BT_MSG_ITER_STATUS_OK; - notit->state = next_state; - } else { - BT_LOGW("Cannot handle state: " - "notit-addr=%p, state=%s", - notit, state_string(notit->state)); + switch (notit->state) { + case STATE_EMIT_MSG_EVENT: + BT_ASSERT(notit->event_msg); + set_event_default_clock_snapshot(notit); + *message = notit->event_msg; + notit->event_msg = NULL; + goto end; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + /* create_msg_discared_events() logs errors */ + create_msg_discarded_events(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } goto end; - } + case STATE_EMIT_MSG_DISCARDED_PACKETS: + /* create_msg_discared_packets() logs errors */ + create_msg_discarded_packets(notit, message); - switch (notit->state) { - case STATE_EMIT_MSG_NEW_STREAM: - /* notify_new_stream() logs errors */ - notify_new_stream(notit, message); + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_PACKET_BEGINNING: + /* create_msg_packet_beginning() logs errors */ + create_msg_packet_beginning(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } - notit->stream_begin_emitted = true; goto end; - case STATE_EMIT_MSG_NEW_PACKET: - /* notify_new_packet() logs errors */ - notify_new_packet(notit, message); + case STATE_EMIT_MSG_PACKET_END_SINGLE: + case STATE_EMIT_MSG_PACKET_END_MULTI: + /* create_msg_packet_end() logs errors */ + create_msg_packet_end(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } goto end; - case STATE_EMIT_MSG_EVENT: - BT_ASSERT(notit->event_msg); - set_event_default_clock_snapshot(notit); - *message = notit->event_msg; - notit->event_msg = NULL; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + /* create_msg_stream_activity_beginning() logs errors */ + create_msg_stream_activity_beginning(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + /* create_msg_stream_activity_end() logs errors */ + create_msg_stream_activity_end(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_STREAM_BEGINNING: + /* create_msg_stream_beginning() logs errors */ + create_msg_stream_beginning(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + goto end; - case STATE_EMIT_MSG_END_OF_PACKET: - /* notify_end_of_packet() logs errors */ - notify_end_of_packet(notit, message); + case STATE_EMIT_MSG_STREAM_END: + /* create_msg_stream_end() logs errors */ + create_msg_stream_end(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } + goto end; + case STATE_DONE: + status = BT_MSG_ITER_STATUS_EOF; goto end; default: /* Non-emitting state: continue */ @@ -2633,60 +2852,59 @@ end: return status; } -BT_HIDDEN -enum bt_msg_iter_status bt_msg_iter_borrow_packet_header_context_fields( - struct bt_msg_iter *notit, - bt_field **packet_header_field, - bt_field **packet_context_field) +static +enum bt_msg_iter_status read_packet_header_context_fields( + struct bt_msg_iter *notit) { int ret; enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; BT_ASSERT(notit); + notit->set_stream = false; - if (notit->state == STATE_EMIT_MSG_NEW_PACKET) { + if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) { /* We're already there */ - goto set_fields; + goto end; } while (true) { status = handle_state(notit); - if (status == BT_MSG_ITER_STATUS_AGAIN) { + if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) { BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN."); goto end; - } - if (status != BT_MSG_ITER_STATUS_OK) { - if (status == BT_MSG_ITER_STATUS_EOF) { - BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF."); - } else { - BT_LOGW("Cannot handle state: " - "notit-addr=%p, state=%s", - notit, state_string(notit->state)); - } + } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) { + BT_LOGW("Cannot handle state: notit-addr=%p, state=%s", + notit, state_string(notit->state)); goto end; } switch (notit->state) { - case STATE_EMIT_MSG_NEW_PACKET: + case STATE_EMIT_MSG_PACKET_BEGINNING: /* * Packet header and context fields are * potentially decoded (or they don't exist). */ - goto set_fields; + goto end; case STATE_INIT: - case STATE_EMIT_MSG_NEW_STREAM: case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN: case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE: case STATE_AFTER_TRACE_PACKET_HEADER: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE: case STATE_AFTER_STREAM_PACKET_CONTEXT: + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: + case STATE_EMIT_MSG_STREAM_BEGINNING: + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + case STATE_EMIT_MSG_DISCARDED_EVENTS: + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + case STATE_EMIT_MSG_DISCARDED_PACKETS: /* Non-emitting state: continue */ break; default: /* * We should never get past the - * STATE_EMIT_MSG_NEW_PACKET state. + * STATE_EMIT_MSG_PACKET_BEGINNING state. */ BT_LOGF("Unexpected state: notit-addr=%p, state=%s", notit, state_string(notit->state)); @@ -2694,22 +2912,13 @@ enum bt_msg_iter_status bt_msg_iter_borrow_packet_header_context_fields( } } -set_fields: +end: ret = set_current_packet_content_sizes(notit); if (ret) { status = BT_MSG_ITER_STATUS_ERROR; goto end; } - if (packet_header_field) { - *packet_header_field = notit->dscopes.trace_packet_header; - } - - if (packet_context_field) { - *packet_context_field = notit->dscopes.stream_packet_context; - } - -end: return status; } @@ -2722,8 +2931,8 @@ void bt_msg_iter_set_medops_data(struct bt_msg_iter *notit, } BT_HIDDEN -enum bt_msg_iter_status bt_msg_iter_seek( - struct bt_msg_iter *notit, off_t offset) +enum bt_msg_iter_status bt_msg_iter_seek(struct bt_msg_iter *notit, + off_t offset) { enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK; enum bt_msg_iter_medium_status medium_status; @@ -2759,42 +2968,22 @@ end: return ret; } -BT_HIDDEN -off_t bt_msg_iter_get_current_packet_offset(struct bt_msg_iter *notit) -{ - BT_ASSERT(notit); - return notit->cur_packet_offset; -} - -BT_HIDDEN -off_t bt_msg_iter_get_current_packet_size( - struct bt_msg_iter *notit) -{ - BT_ASSERT(notit); - return notit->cur_exp_packet_total_size; -} - -BT_HIDDEN -void bt_msg_trace_class_changed(struct bt_msg_iter *notit) -{ - if (notit->meta.tc->stored_value_count > notit->stored_values->len) { - g_array_set_size(notit->stored_values, - notit->meta.tc->stored_value_count); - } -} - BT_HIDDEN enum bt_msg_iter_status bt_msg_iter_get_packet_properties( struct bt_msg_iter *notit, struct bt_msg_iter_packet_properties *props) { + enum bt_msg_iter_status status; + BT_ASSERT(notit); BT_ASSERT(props); + status = read_packet_header_context_fields(notit); + if (status != BT_MSG_ITER_STATUS_OK) { + goto end; + } - props->exp_packet_total_size = - (uint64_t) notit->cur_exp_packet_total_size; - props->exp_packet_content_size = - (uint64_t) notit->cur_exp_packet_content_size; + props->exp_packet_total_size = notit->cur_exp_packet_total_size; + props->exp_packet_content_size = notit->cur_exp_packet_content_size; BT_ASSERT(props->stream_class_id >= 0); props->stream_class_id = (uint64_t) notit->cur_stream_class_id; props->data_stream_id = notit->cur_data_stream_id; @@ -2802,5 +2991,21 @@ enum bt_msg_iter_status bt_msg_iter_get_packet_properties( props->snapshots.packets = notit->snapshots.packets; props->snapshots.beginning_clock = notit->snapshots.beginning_clock; props->snapshots.end_clock = notit->snapshots.end_clock; - return BT_MSG_ITER_STATUS_OK; + +end: + return status; +} + +BT_HIDDEN +void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit, + bool val) +{ + notit->emit_stream_begin_msg = val; +} + +BT_HIDDEN +void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit, + bool val) +{ + notit->emit_stream_end_msg = val; }