import sys
import tempfile
import json
+from collections import namedtuple
class UnexpectedInput(RuntimeError):
return self._stream_class_id
+def _get_entry_timestamp_begin(entry):
+ if type(entry) is _LttngDataStreamBeaconEntry:
+ return entry.timestamp
+ else:
+ assert type(entry) is _LttngDataStreamIndexEntry
+ return entry.timestamp_begin
+
+
# The index of an LTTng data stream, a sequence of index entries.
class _LttngDataStreamIndex(collections.abc.Sequence):
def __init__(self, path, beacons):
return self._file.read(len_bytes)
+class _LttngMetadataStreamSection:
+ def __init__(self, timestamp, data):
+ self._timestamp = timestamp
+ if data is None:
+ self._data = bytes()
+ else:
+ self._data = data
+ logging.info(
+ 'Built metadata stream section: ts={}, data-len={}'.format(
+ self._timestamp, len(self._data)
+ )
+ )
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def data(self):
+ return self._data
+
+
# An LTTng metadata stream.
class _LttngMetadataStream:
- def __init__(self, path):
- self._path = path
- logging.info('Built metadata stream: path="{}"'.format(path))
+ def __init__(self, metadata_file_path, config_sections):
+ self._path = metadata_file_path
+ self._sections = config_sections
+ logging.info(
+ 'Built metadata stream: path={}, section-len={}'.format(
+ self._path, len(self._sections)
+ )
+ )
@property
def path(self):
return self._path
@property
- def data(self):
- assert os.path.isfile(self._path)
+ def sections(self):
+ return self._sections
- with open(self._path, 'rb') as f:
- return f.read()
+
+LttngMetadataConfigSection = namedtuple(
+ 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
+)
+
+
+def _parse_metadata_sections_config(config_sections):
+ assert config_sections is not None
+ config_metadata_sections = []
+ append_empty_section = False
+ last_timestamp = 0
+ last_line = 0
+
+ for config_section in config_sections:
+ if config_section == 'empty':
+ # Found a empty section marker. Actually append the section at the
+ # timestamp of the next concrete section.
+ append_empty_section = True
+ else:
+ assert type(config_section) is dict
+ line = config_section.get('line')
+ ts = config_section.get('timestamp')
+
+ # Sections' timestamps and lines must both be increasing.
+ assert ts > last_timestamp
+ last_timestamp = ts
+ assert line > last_line
+ last_line = line
+
+ if append_empty_section:
+ config_metadata_sections.append(
+ LttngMetadataConfigSection(line, ts, True)
+ )
+ append_empty_section = False
+
+ config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+
+ return config_metadata_sections
+
+
+def _split_metadata_sections(metadata_file_path, raw_config_sections):
+ assert isinstance(raw_config_sections, collections.abc.Sequence)
+
+ parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+
+ sections = []
+ with open(metadata_file_path, 'r') as metadata_file:
+ metadata_lines = [line for line in metadata_file]
+
+ config_metadata_sections_idx = 0
+ curr_metadata_section = bytearray()
+
+ for idx, line_content in enumerate(metadata_lines):
+ # Add one to the index to convert from the zero-indexing of the
+ # enumerate() function to the one-indexing used by humans when
+ # viewing a text file.
+ curr_line_number = idx + 1
+
+ # If there are no more sections, simply append the line.
+ if config_metadata_sections_idx + 1 >= len(parsed_sections):
+ curr_metadata_section += bytearray(line_content, 'utf8')
+ continue
+
+ next_section_line_number = parsed_sections[
+ config_metadata_sections_idx + 1
+ ].line
+
+ # If the next section begins at the current line, create a
+ # section with the metadata we gathered so far.
+ if curr_line_number >= next_section_line_number:
+
+ # Flushing the metadata of the current section.
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp,
+ bytes(curr_metadata_section),
+ )
+ )
+
+ # Move to the next section.
+ config_metadata_sections_idx += 1
+
+ # Clear old content and append current line for the next section.
+ curr_metadata_section.clear()
+ curr_metadata_section += bytearray(line_content, 'utf8')
+
+ # Append any empty sections.
+ while parsed_sections[config_metadata_sections_idx].is_empty:
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp, None
+ )
+ )
+ config_metadata_sections_idx += 1
+ else:
+ # Append line_content to the current metadata section.
+ curr_metadata_section += bytearray(line_content, 'utf8')
+
+ # We iterated over all the lines of the metadata file. Close the current section.
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp,
+ bytes(curr_metadata_section),
+ )
+ )
+
+ return sections
# An LTTng trace, a sequence of LTTng data streams.
class LttngTrace(collections.abc.Sequence):
- def __init__(self, trace_dir, beacons):
+ def __init__(self, trace_dir, metadata_sections, beacons):
assert os.path.isdir(trace_dir)
self._path = trace_dir
- self._metadata_stream = _LttngMetadataStream(
- os.path.join(trace_dir, 'metadata')
- )
+ self._create_metadata_stream(trace_dir, metadata_sections)
self._create_data_streams(trace_dir, beacons)
logging.info('Built trace: path="{}"'.format(trace_dir))
_LttngDataStream(data_stream_path, this_stream_beacons)
)
+ def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+ metadata_path = os.path.join(trace_dir, 'metadata')
+ metadata_sections = []
+
+ if config_metadata_sections is None:
+ with open(metadata_path, 'rb') as metadata_file:
+ metadata_sections.append(
+ _LttngMetadataStreamSection(0, metadata_file.read())
+ )
+ else:
+ metadata_sections = _split_metadata_sections(
+ metadata_path, config_metadata_sections
+ )
+
+ self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+
@property
def path(self):
return self._path
# The state of a single data stream.
class _LttngLiveViewerSessionDataStreamState:
- def __init__(self, ts_state, info, data_stream):
+ def __init__(self, ts_state, info, data_stream, metadata_stream_id):
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
+ self._metadata_stream_id = metadata_stream_id
self._cur_index_entry_index = 0
fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
+ self._cur_metadata_stream_section_index = 0
+ if len(metadata_stream.sections) > 1:
+ self._next_metadata_stream_section_timestamp = metadata_stream.sections[
+ 1
+ ].timestamp
+ else:
+ self._next_metadata_stream_section_timestamp = None
+
self._is_sent = False
fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
def is_sent(self, value):
self._is_sent = value
+ @property
+ def cur_section(self):
+ fmt = "Get current metadata section: section-idx={}"
+ logging.info(fmt.format(self._cur_metadata_stream_section_index))
+ if self._cur_metadata_stream_section_index == len(
+ self._metadata_stream.sections
+ ):
+ return
+
+ return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
+
+ def goto_next_section(self):
+ self._cur_metadata_stream_section_index += 1
+ if self.cur_section:
+ self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
+ else:
+ self._next_metadata_stream_section_timestamp = None
+
+ @property
+ def next_section_timestamp(self):
+ return self._next_metadata_stream_section_timestamp
+
# The state of a tracing session.
class _LttngLiveViewerSessionTracingSessionState:
for trace in tc_descr.traces:
trace_id = stream_id * 1000
+ # Metadata stream -> stream info and metadata stream state
+ info = _LttngLiveViewerStreamInfo(
+ stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
+ )
+ self._stream_infos.append(info)
+ self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+ self, info, trace.metadata_stream
+ )
+ metadata_stream_id = stream_id
+ stream_id += 1
+
# Data streams -> stream infos and data stream states
for data_stream in trace:
info = _LttngLiveViewerStreamInfo(
)
self._stream_infos.append(info)
self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
- self, info, data_stream
+ self, info, data_stream, metadata_stream_id
)
stream_id += 1
- # Metadata stream -> stream info and metadata stream state
- info = _LttngLiveViewerStreamInfo(
- stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
- )
- self._stream_infos.append(info)
- self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
- self, info, trace.metadata_stream
- )
- stream_id += 1
-
self._is_attached = False
fmt = 'Built tracing session state: id={}, name="{}"'
logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
self._is_attached = value
+def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+ if metadata_stream_state.next_section_timestamp is None:
+ return False
+
+ if latest_timestamp >= metadata_stream_state.next_section_timestamp:
+ return True
+ else:
+ return False
+
+
# An LTTng live viewer session manages a view on tracing sessions
# and replies to commands accordingly.
class _LttngLiveViewerSession:
fmt = 'Handling "get next data stream index entry" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
stream_state = self._get_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
raise UnexpectedInput(
status, index_entry, False, False
)
+ timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
+
+ if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
+ metadata_stream_state.is_sent = False
+ metadata_stream_state.goto_next_section()
+
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
def _handle_get_metadata_stream_data_command(self, cmd):
fmt = 'Handling "get metadata stream data" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- stream_state = self._get_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_stream_state(cmd.stream_id)
- if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
+ if (
+ type(metadata_stream_state)
+ is not _LttngLiveViewerSessionMetadataStreamState
+ ):
raise UnexpectedInput(
'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
)
- if stream_state.is_sent:
+ if metadata_stream_state.is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
- stream_state.is_sent = True
+ metadata_stream_state.is_sent = True
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
+ metadata_section = metadata_stream_state.cur_section
+
+ # If we are sending an empty section, ready the next one right away.
+ if len(metadata_section.data) == 0:
+ metadata_stream_state.is_sent = False
+ metadata_stream_state.goto_next_section()
+
+ fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
+ logging.info(fmt.format(len(metadata_section.data)))
return _LttngLiveViewerGetMetadataStreamDataContentReply(
- status, stream_state.metadata_stream.data
+ status, metadata_section.data
)
def _handle_get_new_stream_infos_command(self, cmd):
# "path": "meow/mix",
# "beacons": {
# "my_stream": [ 5235787, 728375283 ]
- # }
+ # },
+ # "metadata-sections": [
+ # {
+ # "line": 1,
+ # "timestamp": 0
+ # }
+ # ]
# }
# ]
# }
traces = []
for trace in session['traces']:
+ metadata_sections = trace.get('metadata-sections')
beacons = trace.get('beacons')
path = trace['path']
if not os.path.isabs(path):
path = os.path.join(trace_path_prefix, path)
- traces.append(LttngTrace(path, beacons))
+ traces.append(LttngTrace(path, metadata_sections, beacons))
sessions.append(
LttngTracingSessionDescriptor(
args = parser.parse_args(args=remaining_args)
try:
sessions = _session_descriptors_from_path(
- args.sessions_filename, args.trace_path_prefix
+ args.sessions_filename,
+ args.trace_path_prefix,
)
LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
except UnexpectedInput as exc:
--- /dev/null
+Trace class:
+ Stream class (ID 0):
+ Supports packets: Yes
+ Packets have beginning default clock snapshot: Yes
+ Packets have end default clock snapshot: Yes
+ Supports discarded events: Yes
+ Discarded events have default clock snapshots: Yes
+ Supports discarded packets: Yes
+ Discarded packets have default clock snapshots: Yes
+ Default clock class:
+ Name: monotonic
+ Description: Monotonic Clock
+ Frequency (Hz): 1,000,000,000
+ Precision (cycles): 0
+ Offset (s): 1,594,406,328
+ Offset (cycles): 768,346,378
+ Origin is Unix epoch: Yes
+ UUID: 81a04b89-9028-4d3e-a28d-5fbd53a8eb9d
+ Packet context field class: Structure (1 member):
+ cpu_id: Unsigned integer (32-bit, Base 10)
+ Event class `my_app:signe_de_pia$$e` (ID 0):
+ Log level: Debug (line)
+ Payload field class: Structure (0 members)
+ Event class `my_app:signe_de_pia$$e_2` (ID 1):
+ Log level: Debug (line)
+ Payload field class: Structure (0 members)
+
+[Unknown]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Stream beginning:
+ Name: stream-1
+ Trace:
+ UUID: 0339cd08-892d-404c-9291-64c1a8a74c81
+ Environment (10 entries):
+ architecture_bit_width: 64
+ domain: ust
+ hostname: raton
+ trace_creation_datetime: 20200715T174253-0400
+ trace_name: barney_descontie
+ tracer_buffering_id: 1000
+ tracer_buffering_scheme: uid
+ tracer_major: 2
+ tracer_minor: 12
+ tracer_name: lttng-ust
+ Stream (ID 0, Class ID 0)
+
+[443,073,474,574,097 cycles, 1,594,849,402,242,920,475 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet beginning:
+ Context:
+ cpu_id: 0
+
+[443,073,484,867,537 cycles, 1,594,849,402,253,213,915 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e` (Class ID 0):
+ Payload: Empty
+
+[443,076,225,270,435 cycles, 1,594,849,404,993,616,813 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet end
+
+[443,087,407,631,276 cycles, 1,594,849,416,175,977,654 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet beginning:
+ Context:
+ cpu_id: 0
+
+[443,087,407,631,276 cycles, 1,594,849,416,175,977,654 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e` (Class ID 0):
+ Payload: Empty
+
+[443,087,407,643,172 cycles, 1,594,849,416,175,989,550 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e_2` (Class ID 1):
+ Payload: Empty
+
+[443,089,152,508,997 cycles, 1,594,849,417,920,855,375 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet end
+
+[Unknown]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Stream end