1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
17 from typing
import Dict
, Union
, Iterable
, Optional
, Sequence
, overload
22 from typing
import Any
, Callable
# noqa: F401
27 # An entry within the index of an LTTng data stream.
28 class _LttngDataStreamIndexEntry
:
33 content_size_bits
: int,
36 events_discarded
: int,
39 self
._offset
_bytes
= offset_bytes
40 self
._total
_size
_bits
= total_size_bits
41 self
._content
_size
_bits
= content_size_bits
42 self
._timestamp
_begin
= timestamp_begin
43 self
._timestamp
_end
= timestamp_end
44 self
._events
_discarded
= events_discarded
45 self
._stream
_class
_id
= stream_class_id
48 def offset_bytes(self
):
49 return self
._offset
_bytes
52 def total_size_bits(self
):
53 return self
._total
_size
_bits
56 def total_size_bytes(self
):
57 return self
._total
_size
_bits
// 8
60 def content_size_bits(self
):
61 return self
._content
_size
_bits
64 def content_size_bytes(self
):
65 return self
._content
_size
_bits
// 8
68 def timestamp_begin(self
):
69 return self
._timestamp
_begin
72 def timestamp_end(self
):
73 return self
._timestamp
_end
76 def events_discarded(self
):
77 return self
._events
_discarded
80 def stream_class_id(self
):
81 return self
._stream
_class
_id
84 # An entry within the index of an LTTng data stream. While a stream beacon entry
85 # is conceptually unrelated to an index, it is sent as a reply to a
86 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
87 class _LttngDataStreamBeaconIndexEntry
:
88 def __init__(self
, stream_class_id
: int, timestamp
: int):
89 self
._stream
_class
_id
= stream_class_id
90 self
._timestamp
= timestamp
94 return self
._timestamp
97 def stream_class_id(self
):
98 return self
._stream
_class
_id
101 _LttngIndexEntryT
= Union
[_LttngDataStreamIndexEntry
, _LttngDataStreamBeaconIndexEntry
]
104 class _LttngLiveViewerCommand
:
105 def __init__(self
, version
: int):
106 self
._version
= version
113 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
114 def __init__(self
, version
: int, viewer_session_id
: int, major
: int, minor
: int):
115 super().__init
__(version
)
116 self
._viewer
_session
_id
= viewer_session_id
121 def viewer_session_id(self
):
122 return self
._viewer
_session
_id
133 class _LttngLiveViewerReply
:
137 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply
):
138 def __init__(self
, viewer_session_id
: int, major
: int, minor
: int):
139 self
._viewer
_session
_id
= viewer_session_id
144 def viewer_session_id(self
):
145 return self
._viewer
_session
_id
156 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
160 class _LttngLiveViewerTracingSessionInfo
:
163 tracing_session_id
: int,
164 live_timer_freq
: int,
170 self
._tracing
_session
_id
= tracing_session_id
171 self
._live
_timer
_freq
= live_timer_freq
172 self
._client
_count
= client_count
173 self
._stream
_count
= stream_count
174 self
._hostname
= hostname
178 def tracing_session_id(self
):
179 return self
._tracing
_session
_id
182 def live_timer_freq(self
):
183 return self
._live
_timer
_freq
186 def client_count(self
):
187 return self
._client
_count
190 def stream_count(self
):
191 return self
._stream
_count
195 return self
._hostname
202 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply
):
204 self
, tracing_session_infos
: Sequence
[_LttngLiveViewerTracingSessionInfo
]
206 self
._tracing
_session
_infos
= tracing_session_infos
209 def tracing_session_infos(self
):
210 return self
._tracing
_session
_infos
213 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
219 self
, version
: int, tracing_session_id
: int, offset
: int, seek_type
: int
221 super().__init
__(version
)
222 self
._tracing
_session
_id
= tracing_session_id
223 self
._offset
= offset
224 self
._seek
_type
= seek_type
227 def tracing_session_id(self
):
228 return self
._tracing
_session
_id
236 return self
._seek
_type
239 class _LttngLiveViewerStreamInfo
:
241 self
, id: int, trace_id
: int, is_metadata
: bool, path
: str, channel_name
: str
244 self
._trace
_id
= trace_id
245 self
._is
_metadata
= is_metadata
247 self
._channel
_name
= channel_name
255 return self
._trace
_id
258 def is_metadata(self
):
259 return self
._is
_metadata
266 def channel_name(self
):
267 return self
._channel
_name
270 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply
):
279 def __init__(self
, status
: int, stream_infos
: Sequence
[_LttngLiveViewerStreamInfo
]):
280 self
._status
= status
281 self
._stream
_infos
= stream_infos
288 def stream_infos(self
):
289 return self
._stream
_infos
292 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
293 def __init__(self
, version
: int, stream_id
: int):
294 super().__init
__(version
)
295 self
._stream
_id
= stream_id
299 return self
._stream
_id
302 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply
):
314 index_entry
: _LttngIndexEntryT
,
315 has_new_metadata
: bool,
316 has_new_data_stream
: bool,
318 self
._status
= status
319 self
._index
_entry
= index_entry
320 self
._has
_new
_metadata
= has_new_metadata
321 self
._has
_new
_data
_stream
= has_new_data_stream
328 def index_entry(self
):
329 return self
._index
_entry
332 def has_new_metadata(self
):
333 return self
._has
_new
_metadata
336 def has_new_data_stream(self
):
337 return self
._has
_new
_data
_stream
340 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
341 def __init__(self
, version
: int, stream_id
: int, offset
: int, req_length
: int):
342 super().__init
__(version
)
343 self
._stream
_id
= stream_id
344 self
._offset
= offset
345 self
._req
_length
= req_length
349 return self
._stream
_id
356 def req_length(self
):
357 return self
._req
_length
360 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply
):
371 has_new_metadata
: bool,
372 has_new_data_stream
: bool,
374 self
._status
= status
376 self
._has
_new
_metadata
= has_new_metadata
377 self
._has
_new
_data
_stream
= has_new_data_stream
388 def has_new_metadata(self
):
389 return self
._has
_new
_metadata
392 def has_new_data_stream(self
):
393 return self
._has
_new
_data
_stream
396 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
397 def __init__(self
, version
: int, stream_id
: int):
398 super().__init
__(version
)
399 self
._stream
_id
= stream_id
403 return self
._stream
_id
406 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply
):
412 def __init__(self
, status
: int, data
: bytes
):
413 self
._status
= status
425 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
426 def __init__(self
, version
: int, tracing_session_id
: int):
427 super().__init
__(version
)
428 self
._tracing
_session
_id
= tracing_session_id
431 def tracing_session_id(self
):
432 return self
._tracing
_session
_id
435 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply
):
442 def __init__(self
, status
: int, stream_infos
: Sequence
[_LttngLiveViewerStreamInfo
]):
443 self
._status
= status
444 self
._stream
_infos
= stream_infos
451 def stream_infos(self
):
452 return self
._stream
_infos
455 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
459 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply
):
464 def __init__(self
, status
: int):
465 self
._status
= status
472 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
473 def __init__(self
, version
: int, tracing_session_id
: int):
474 super().__init
__(version
)
475 self
._tracing
_session
_id
= tracing_session_id
478 def tracing_session_id(self
):
479 return self
._tracing
_session
_id
482 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply
):
488 def __init__(self
, status
: int):
489 self
._status
= status
496 # An LTTng live protocol codec can convert bytes to command objects and
497 # reply objects to bytes.
498 class _LttngLiveViewerProtocolCodec
:
499 _COMMAND_HEADER_STRUCT_FMT
= "QII"
500 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
505 def _unpack(self
, fmt
: str, data
: bytes
, offset
: int = 0):
507 return struct
.unpack_from(fmt
, data
, offset
)
509 def _unpack_payload(self
, fmt
: str, data
: bytes
):
511 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
514 def decode(self
, data
: bytes
):
515 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
516 # Not enough data to read the command header
519 payload_size
, cmd_type
, version
= self
._unpack
(
520 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
523 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
524 payload_size
, cmd_type
, version
528 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
529 # Not enough data to read the whole command
533 viewer_session_id
, major
, minor
, _
= self
._unpack
_payload
("QIII", data
)
534 return _LttngLiveViewerConnectCommand(
535 version
, viewer_session_id
, major
, minor
538 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
540 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
("QQI", data
)
541 return _LttngLiveViewerAttachToTracingSessionCommand(
542 version
, tracing_session_id
, offset
, seek_type
545 (stream_id
,) = self
._unpack
_payload
("Q", data
)
546 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
550 stream_id
, offset
, req_length
= self
._unpack
_payload
("QQI", data
)
551 return _LttngLiveViewerGetDataStreamPacketDataCommand(
552 version
, stream_id
, offset
, req_length
555 (stream_id
,) = self
._unpack
_payload
("Q", data
)
556 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
558 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
559 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
561 return _LttngLiveViewerCreateViewerSessionCommand(version
)
563 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
564 return _LttngLiveViewerDetachFromTracingSessionCommand(
565 version
, tracing_session_id
568 raise RuntimeError("Unknown command type {}".format(cmd_type
))
570 def _pack(self
, fmt
: str, *args
: Any
):
571 # Force network byte order
572 return struct
.pack("!" + fmt
, *args
)
574 def _encode_zero_padded_str(self
, string
: str, length
: int):
575 data
= string
.encode()
576 return data
.ljust(length
, b
"\x00")
578 def _encode_stream_info(self
, info
: _LttngLiveViewerStreamInfo
):
579 data
= self
._pack
("QQI", info
.id, info
.trace_id
, int(info
.is_metadata
))
580 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
581 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
584 def _get_has_new_stuff_flags(
585 self
, has_new_metadata
: bool, has_new_data_streams
: bool
592 if has_new_data_streams
:
599 reply
: _LttngLiveViewerReply
,
601 if type(reply
) is _LttngLiveViewerConnectReply
:
603 "QIII", reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
605 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
606 data
= self
._pack
("I", len(reply
.tracing_session_infos
))
608 for info
in reply
.tracing_session_infos
:
611 info
.tracing_session_id
,
612 info
.live_timer_freq
,
616 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
617 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
618 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
619 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
621 for info
in reply
.stream_infos
:
622 data
+= self
._encode
_stream
_info
(info
)
623 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
624 index_format
= "QQQQQQQII"
625 entry
= reply
.index_entry
626 flags
= self
._get
_has
_new
_stuff
_flags
(
627 reply
.has_new_metadata
, reply
.has_new_data_stream
630 if isinstance(entry
, _LttngDataStreamIndexEntry
):
634 entry
.total_size_bits
,
635 entry
.content_size_bits
,
636 entry
.timestamp_begin
,
638 entry
.events_discarded
,
639 entry
.stream_class_id
,
652 entry
.stream_class_id
,
656 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
657 flags
= self
._get
_has
_new
_stuff
_flags
(
658 reply
.has_new_metadata
, reply
.has_new_data_stream
660 data
= self
._pack
("III", reply
.status
, len(reply
.data
), flags
)
662 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
663 data
= self
._pack
("QI", len(reply
.data
), reply
.status
)
665 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
666 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
668 for info
in reply
.stream_infos
:
669 data
+= self
._encode
_stream
_info
(info
)
670 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
671 data
= self
._pack
("I", reply
.status
)
672 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
673 data
= self
._pack
("I", reply
.status
)
676 "Unknown reply object with class `{}`".format(reply
.__class
__.__name
__)
682 def _get_entry_timestamp_begin(
683 entry
: _LttngIndexEntryT
,
685 if isinstance(entry
, _LttngDataStreamBeaconIndexEntry
):
686 return entry
.timestamp
688 return entry
.timestamp_begin
691 # The index of an LTTng data stream, a sequence of index entries.
692 class _LttngDataStreamIndex(Sequence
[_LttngIndexEntryT
]):
693 def __init__(self
, path
: str, beacons
: Optional
[tjson
.ArrayVal
]):
698 stream_class_id
= self
._entries
[0].stream_class_id
700 beacons_list
= [] # type: list[_LttngDataStreamBeaconIndexEntry]
701 for ts
in beacons
.iter(tjson
.IntVal
):
703 _LttngDataStreamBeaconIndexEntry(stream_class_id
, ts
.val
)
706 self
._add
_beacons
(beacons_list
)
709 'Built data stream index entries: path="{}", count={}'.format(
710 path
, len(self
._entries
)
715 self
._entries
= [] # type: list[_LttngIndexEntryT]
717 with
open(self
._path
, "rb") as f
:
720 size
= struct
.calcsize(fmt
)
722 assert len(data
) == size
723 magic
, _
, _
, index_entry_length
= struct
.unpack(fmt
, data
)
724 assert magic
== 0xC1F1DCC1
728 size
= struct
.calcsize(fmt
)
732 'Decoding data stream index entry: path="{}", offset={}'.format(
742 assert len(data
) == size
751 ) = struct
.unpack(fmt
, data
)
753 self
._entries
.append(
754 _LttngDataStreamIndexEntry(
765 # Skip anything else before the next entry
766 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
768 def _add_beacons(self
, beacons
: Iterable
[_LttngDataStreamBeaconIndexEntry
]):
769 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
771 entry
: Union
[_LttngDataStreamIndexEntry
, _LttngDataStreamBeaconIndexEntry
],
773 if isinstance(entry
, _LttngDataStreamBeaconIndexEntry
):
774 return entry
.timestamp
776 return entry
.timestamp_end
778 self
._entries
+= beacons
779 self
._entries
.sort(key
=sort_key
)
782 def __getitem__(self
, index
: int) -> _LttngIndexEntryT
:
786 def __getitem__(self
, index
: slice) -> Sequence
[_LttngIndexEntryT
]: # noqa: F811
789 def __getitem__( # noqa: F811
790 self
, index
: Union
[int, slice]
791 ) -> Union
[_LttngIndexEntryT
, Sequence
[_LttngIndexEntryT
],]:
792 return self
._entries
[index
]
795 return len(self
._entries
)
802 # An LTTng data stream.
803 class _LttngDataStream
:
804 def __init__(self
, path
: str, beacons_json
: Optional
[tjson
.ArrayVal
]):
806 filename
= os
.path
.basename(path
)
807 match
= re
.match(r
"(.*)_\d+", filename
)
810 "Unexpected data stream file name pattern: {}".format(filename
)
813 self
._channel
_name
= match
.group(1)
814 trace_dir
= os
.path
.dirname(path
)
815 index_path
= os
.path
.join(trace_dir
, "index", filename
+ ".idx")
816 self
._index
= _LttngDataStreamIndex(index_path
, beacons_json
)
817 assert os
.path
.isfile(path
)
818 self
._file
= open(path
, "rb")
820 'Built data stream: path="{}", channel-name="{}"'.format(
821 path
, self
._channel
_name
830 def channel_name(self
):
831 return self
._channel
_name
837 def get_data(self
, offset_bytes
: int, len_bytes
: int):
838 self
._file
.seek(offset_bytes
)
839 return self
._file
.read(len_bytes
)
842 class _LttngMetadataStreamSection
:
843 def __init__(self
, timestamp
: int, data
: Optional
[bytes
]):
844 self
._timestamp
= timestamp
850 "Built metadata stream section: ts={}, data-len={}".format(
851 self
._timestamp
, len(self
._data
)
857 return self
._timestamp
864 # An LTTng metadata stream.
865 class _LttngMetadataStream
:
868 metadata_file_path
: str,
869 config_sections
: Sequence
[_LttngMetadataStreamSection
],
871 self
._path
= metadata_file_path
872 self
._sections
= config_sections
874 "Built metadata stream: path={}, section-len={}".format(
875 self
._path
, len(self
._sections
)
885 return self
._sections
888 class LttngMetadataConfigSection
:
889 def __init__(self
, line
: int, timestamp
: int, is_empty
: bool):
891 self
._timestamp
= timestamp
892 self
._is
_empty
= is_empty
900 return self
._timestamp
904 return self
._is
_empty
907 def _parse_metadata_sections_config(metadata_sections_json
: tjson
.ArrayVal
):
908 metadata_sections
= [] # type: list[LttngMetadataConfigSection]
909 append_empty_section
= False
913 for section
in metadata_sections_json
:
914 if isinstance(section
, tjson
.StrVal
):
915 if section
.val
== "empty":
916 # Found a empty section marker. Actually append the section at the
917 # timestamp of the next concrete section.
918 append_empty_section
= True
920 raise ValueError("Invalid string value at {}.".format(section
.path
))
921 elif isinstance(section
, tjson
.ObjVal
):
922 line
= section
.at("line", tjson
.IntVal
).val
923 ts
= section
.at("timestamp", tjson
.IntVal
).val
925 # Sections' timestamps and lines must both be increasing.
926 assert ts
> last_timestamp
929 assert line
> last_line
932 if append_empty_section
:
933 metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, True))
934 append_empty_section
= False
936 metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, False))
939 "`{}`: expecting a string or object value".format(section
.path
)
942 return metadata_sections
945 def _split_metadata_sections(
946 metadata_file_path
: str, metadata_sections_json
: tjson
.ArrayVal
948 metadata_sections
= _parse_metadata_sections_config(metadata_sections_json
)
950 sections
= [] # type: list[_LttngMetadataStreamSection]
951 with
open(metadata_file_path
, "r") as metadata_file
:
952 metadata_lines
= [line
for line
in metadata_file
]
954 metadata_section_idx
= 0
955 curr_metadata_section
= bytearray()
957 for idx
, line_content
in enumerate(metadata_lines
):
958 # Add one to the index to convert from the zero-indexing of the
959 # enumerate() function to the one-indexing used by humans when
960 # viewing a text file.
961 curr_line_number
= idx
+ 1
963 # If there are no more sections, simply append the line.
964 if metadata_section_idx
+ 1 >= len(metadata_sections
):
965 curr_metadata_section
+= bytearray(line_content
, "utf8")
968 next_section_line_number
= metadata_sections
[metadata_section_idx
+ 1].line
970 # If the next section begins at the current line, create a
971 # section with the metadata we gathered so far.
972 if curr_line_number
>= next_section_line_number
:
973 # Flushing the metadata of the current section.
975 _LttngMetadataStreamSection(
976 metadata_sections
[metadata_section_idx
].timestamp
,
977 bytes(curr_metadata_section
),
981 # Move to the next section.
982 metadata_section_idx
+= 1
984 # Clear old content and append current line for the next section.
985 curr_metadata_section
.clear()
986 curr_metadata_section
+= bytearray(line_content
, "utf8")
988 # Append any empty sections.
989 while metadata_sections
[metadata_section_idx
].is_empty
:
991 _LttngMetadataStreamSection(
992 metadata_sections
[metadata_section_idx
].timestamp
, None
995 metadata_section_idx
+= 1
997 # Append line_content to the current metadata section.
998 curr_metadata_section
+= bytearray(line_content
, "utf8")
1000 # We iterated over all the lines of the metadata file. Close the current section.
1002 _LttngMetadataStreamSection(
1003 metadata_sections
[metadata_section_idx
].timestamp
,
1004 bytes(curr_metadata_section
),
1011 _StreamBeaconsT
= Dict
[str, Iterable
[int]]
1014 # An LTTng trace, a sequence of LTTng data streams.
1015 class LttngTrace(Sequence
[_LttngDataStream
]):
1019 metadata_sections_json
: Optional
[tjson
.ArrayVal
],
1020 beacons_json
: Optional
[tjson
.ObjVal
],
1022 self
._path
= trace_dir
1023 self
._create
_metadata
_stream
(trace_dir
, metadata_sections_json
)
1024 self
._create
_data
_streams
(trace_dir
, beacons_json
)
1025 logging
.info('Built trace: path="{}"'.format(trace_dir
))
1027 def _create_data_streams(
1028 self
, trace_dir
: str, beacons_json
: Optional
[tjson
.ObjVal
]
1030 data_stream_paths
= [] # type: list[str]
1032 for filename
in os
.listdir(trace_dir
):
1033 path
= os
.path
.join(trace_dir
, filename
)
1035 if not os
.path
.isfile(path
):
1038 if filename
.startswith("."):
1041 if filename
== "metadata":
1044 data_stream_paths
.append(path
)
1046 data_stream_paths
.sort()
1047 self
._data
_streams
= [] # type: list[_LttngDataStream]
1049 for data_stream_path
in data_stream_paths
:
1050 stream_name
= os
.path
.basename(data_stream_path
)
1051 this_beacons_json
= None
1052 if beacons_json
is not None and stream_name
in beacons_json
:
1053 this_beacons_json
= beacons_json
.at(stream_name
, tjson
.ArrayVal
)
1055 self
._data
_streams
.append(
1056 _LttngDataStream(data_stream_path
, this_beacons_json
)
1059 def _create_metadata_stream(
1060 self
, trace_dir
: str, metadata_sections_json
: Optional
[tjson
.ArrayVal
]
1062 metadata_path
= os
.path
.join(trace_dir
, "metadata")
1063 metadata_sections
= [] # type: list[_LttngMetadataStreamSection]
1065 if metadata_sections_json
is None:
1066 with
open(metadata_path
, "rb") as metadata_file
:
1067 metadata_sections
.append(
1068 _LttngMetadataStreamSection(0, metadata_file
.read())
1071 metadata_sections
= _split_metadata_sections(
1072 metadata_path
, metadata_sections_json
1075 self
._metadata
_stream
= _LttngMetadataStream(metadata_path
, metadata_sections
)
1082 def metadata_stream(self
):
1083 return self
._metadata
_stream
1086 def __getitem__(self
, index
: int) -> _LttngDataStream
:
1090 def __getitem__(self
, index
: slice) -> Sequence
[_LttngDataStream
]: # noqa: F811
1093 def __getitem__( # noqa: F811
1094 self
, index
: Union
[int, slice]
1095 ) -> Union
[_LttngDataStream
, Sequence
[_LttngDataStream
]]:
1096 return self
._data
_streams
[index
]
1099 return len(self
._data
_streams
)
1102 # The state of a single data stream.
1103 class _LttngLiveViewerSessionDataStreamState
:
1106 ts_state
: "_LttngLiveViewerSessionTracingSessionState",
1107 info
: _LttngLiveViewerStreamInfo
,
1108 data_stream
: _LttngDataStream
,
1109 metadata_stream_id
: int,
1111 self
._ts
_state
= ts_state
1113 self
._data
_stream
= data_stream
1114 self
._metadata
_stream
_id
= metadata_stream_id
1115 self
._cur
_index
_entry
_index
= 0
1116 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1120 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1121 ts_state
.tracing_session_descriptor
.info
.name
,
1127 def tracing_session_state(self
):
1128 return self
._ts
_state
1135 def data_stream(self
):
1136 return self
._data
_stream
1139 def cur_index_entry(self
):
1140 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
1143 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
1146 def metadata_stream_id(self
):
1147 return self
._metadata
_stream
_id
1149 def goto_next_index_entry(self
):
1150 self
._cur
_index
_entry
_index
+= 1
1153 # The state of a single metadata stream.
1154 class _LttngLiveViewerSessionMetadataStreamState
:
1157 ts_state
: "_LttngLiveViewerSessionTracingSessionState",
1158 info
: _LttngLiveViewerStreamInfo
,
1159 metadata_stream
: _LttngMetadataStream
,
1161 self
._ts
_state
= ts_state
1163 self
._metadata
_stream
= metadata_stream
1164 self
._cur
_metadata
_stream
_section
_index
= 0
1165 if len(metadata_stream
.sections
) > 1:
1166 self
._next
_metadata
_stream
_section
_timestamp
= metadata_stream
.sections
[
1170 self
._next
_metadata
_stream
_section
_timestamp
= None
1172 self
._is
_sent
= False
1173 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1177 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1178 ts_state
.tracing_session_descriptor
.info
.name
,
1179 metadata_stream
.path
,
1188 def metadata_stream(self
):
1189 return self
._metadata
_stream
1193 return self
._is
_sent
1196 def is_sent(self
, value
: bool):
1197 self
._is
_sent
= value
1200 def cur_section(self
):
1201 fmt
= "Get current metadata section: section-idx={}"
1202 logging
.info(fmt
.format(self
._cur
_metadata
_stream
_section
_index
))
1203 if self
._cur
_metadata
_stream
_section
_index
== len(
1204 self
._metadata
_stream
.sections
1208 return self
._metadata
_stream
.sections
[self
._cur
_metadata
_stream
_section
_index
]
1210 def goto_next_section(self
):
1211 self
._cur
_metadata
_stream
_section
_index
+= 1
1212 if self
.cur_section
:
1213 self
._next
_metadata
_stream
_section
_timestamp
= self
.cur_section
.timestamp
1215 self
._next
_metadata
_stream
_section
_timestamp
= None
1218 def next_section_timestamp(self
):
1219 return self
._next
_metadata
_stream
_section
_timestamp
1222 # A tracing session descriptor.
1224 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1226 class LttngTracingSessionDescriptor
:
1230 tracing_session_id
: int,
1232 live_timer_freq
: int,
1234 traces
: Iterable
[LttngTrace
],
1236 for trace
in traces
:
1237 if name
not in trace
.path
:
1238 fmt
= "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1239 raise ValueError(fmt
.format(name
, trace
.path
))
1241 self
._traces
= traces
1242 stream_count
= sum([len(t
) + 1 for t
in traces
])
1243 self
._info
= _LttngLiveViewerTracingSessionInfo(
1261 # The state of a tracing session.
1262 class _LttngLiveViewerSessionTracingSessionState
:
1263 def __init__(self
, tc_descr
: LttngTracingSessionDescriptor
, base_stream_id
: int):
1264 self
._tc
_descr
= tc_descr
1265 self
._stream
_infos
= [] # type: list[_LttngLiveViewerStreamInfo]
1266 self
._ds
_states
= {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1269 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1270 stream_id
= base_stream_id
1272 for trace
in tc_descr
.traces
:
1273 trace_id
= stream_id
* 1000
1275 # Metadata stream -> stream info and metadata stream state
1276 info
= _LttngLiveViewerStreamInfo(
1277 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, "metadata"
1279 self
._stream
_infos
.append(info
)
1280 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
1281 self
, info
, trace
.metadata_stream
1283 metadata_stream_id
= stream_id
1286 # Data streams -> stream infos and data stream states
1287 for data_stream
in trace
:
1288 info
= _LttngLiveViewerStreamInfo(
1293 data_stream
.channel_name
,
1295 self
._stream
_infos
.append(info
)
1296 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
1297 self
, info
, data_stream
, metadata_stream_id
1301 self
._is
_attached
= False
1302 fmt
= 'Built tracing session state: id={}, name="{}"'
1303 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
1306 def tracing_session_descriptor(self
):
1307 return self
._tc
_descr
1310 def data_stream_states(self
):
1311 return self
._ds
_states
1314 def metadata_stream_states(self
):
1315 return self
._ms
_states
1318 def stream_infos(self
):
1319 return self
._stream
_infos
1322 def has_new_metadata(self
):
1323 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
1326 def is_attached(self
):
1327 return self
._is
_attached
1330 def is_attached(self
, value
: bool):
1331 self
._is
_attached
= value
1334 def needs_new_metadata_section(
1335 metadata_stream_state
: _LttngLiveViewerSessionMetadataStreamState
,
1336 latest_timestamp
: int,
1338 if metadata_stream_state
.next_section_timestamp
is None:
1341 if latest_timestamp
>= metadata_stream_state
.next_section_timestamp
:
1347 # An LTTng live viewer session manages a view on tracing sessions
1348 # and replies to commands accordingly.
1349 class _LttngLiveViewerSession
:
1352 viewer_session_id
: int,
1353 tracing_session_descriptors
: Iterable
[LttngTracingSessionDescriptor
],
1354 max_query_data_response_size
: Optional
[int],
1356 self
._viewer
_session
_id
= viewer_session_id
1359 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1360 self
._stream
_states
= (
1362 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1363 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1364 total_stream_infos
= 0
1366 for ts_descr
in tracing_session_descriptors
:
1367 ts_state
= _LttngLiveViewerSessionTracingSessionState(
1368 ts_descr
, total_stream_infos
1370 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
1371 self
._ts
_states
[ts_id
] = ts_state
1372 total_stream_infos
+= len(ts_state
.stream_infos
)
1374 # Update session's stream states to have the new states
1375 self
._stream
_states
.update(ts_state
.data_stream_states
)
1376 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
1378 self
._command
_handlers
= {
1379 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
1380 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
1381 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
1382 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
1383 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
1384 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
1385 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
1386 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1387 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1390 def viewer_session_id(self
):
1391 return self
._viewer
_session
_id
1393 def _get_tracing_session_state(self
, tracing_session_id
: int):
1394 if tracing_session_id
not in self
._ts
_states
:
1396 "Unknown tracing session ID {}".format(tracing_session_id
)
1399 return self
._ts
_states
[tracing_session_id
]
1401 def _get_data_stream_state(self
, stream_id
: int):
1402 if stream_id
not in self
._stream
_states
:
1403 RuntimeError("Unknown stream ID {}".format(stream_id
))
1405 stream
= self
._stream
_states
[stream_id
]
1406 if type(stream
) is not _LttngLiveViewerSessionDataStreamState
:
1407 raise RuntimeError("Stream is not a data stream")
1411 def _get_metadata_stream_state(self
, stream_id
: int):
1412 if stream_id
not in self
._stream
_states
:
1413 RuntimeError("Unknown stream ID {}".format(stream_id
))
1415 stream
= self
._stream
_states
[stream_id
]
1416 if type(stream
) is not _LttngLiveViewerSessionMetadataStreamState
:
1417 raise RuntimeError("Stream is not a metadata stream")
1421 def handle_command(self
, cmd
: _LttngLiveViewerCommand
):
1423 "Handling command in viewer session: cmd-cls-name={}".format(
1424 cmd
.__class
__.__name
__
1427 cmd_type
= type(cmd
)
1429 if cmd_type
not in self
._command
_handlers
:
1431 "Unexpected command: cmd-cls-name={}".format(cmd
.__class
__.__name
__)
1434 return self
._command
_handlers
[cmd_type
](cmd
)
1436 def _handle_attach_to_tracing_session_command(
1437 self
, cmd
: _LttngLiveViewerAttachToTracingSessionCommand
1439 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1440 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1441 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1442 info
= ts_state
.tracing_session_descriptor
.info
1444 if ts_state
.is_attached
:
1446 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1451 ts_state
.is_attached
= True
1452 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1453 return _LttngLiveViewerAttachToTracingSessionReply(
1454 status
, ts_state
.stream_infos
1457 def _handle_detach_from_tracing_session_command(
1458 self
, cmd
: _LttngLiveViewerDetachFromTracingSessionCommand
1460 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1461 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1462 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1463 info
= ts_state
.tracing_session_descriptor
.info
1465 if not ts_state
.is_attached
:
1467 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1472 ts_state
.is_attached
= False
1473 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1474 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1476 def _handle_get_next_data_stream_index_entry_command(
1477 self
, cmd
: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1479 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1480 logging
.info(fmt
.format(cmd
.stream_id
))
1481 stream_state
= self
._get
_data
_stream
_state
(cmd
.stream_id
)
1482 metadata_stream_state
= self
._get
_metadata
_stream
_state
(
1483 stream_state
.metadata_stream_id
1486 if stream_state
.cur_index_entry
is None:
1487 # The viewer is done reading this stream
1488 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1490 # Dummy data stream index entry to use with the `HUP` status
1491 # (the reply needs one, but the viewer ignores it)
1492 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1494 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1495 status
, index_entry
, False, False
1498 timestamp_begin
= _get_entry_timestamp_begin(stream_state
.cur_index_entry
)
1500 if needs_new_metadata_section(metadata_stream_state
, timestamp_begin
):
1501 metadata_stream_state
.is_sent
= False
1502 metadata_stream_state
.goto_next_section()
1504 # The viewer only checks the `has_new_metadata` flag if the
1505 # reply's status is `OK`, so we need to provide an index here
1506 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1507 if isinstance(stream_state
.cur_index_entry
, _LttngDataStreamIndexEntry
):
1508 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1510 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.INACTIVE
1512 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1513 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1515 stream_state
.goto_next_index_entry()
1518 def _handle_get_data_stream_packet_data_command(
1519 self
, cmd
: _LttngLiveViewerGetDataStreamPacketDataCommand
1521 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1522 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1523 stream_state
= self
._get
_data
_stream
_state
(cmd
.stream_id
)
1524 data_response_length
= cmd
.req_length
1526 if stream_state
.tracing_session_state
.has_new_metadata
:
1527 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1528 return _LttngLiveViewerGetDataStreamPacketDataReply(
1529 status
, bytes(), True, False
1532 if self
._max
_query
_data
_response
_size
:
1533 # Enforce a server side limit on the query requested length.
1534 # To ensure that the transaction terminate take the minimum of both
1536 data_response_length
= min(
1537 cmd
.req_length
, self
._max
_query
_data
_response
_size
1539 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1540 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1542 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1543 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1544 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1546 def _handle_get_metadata_stream_data_command(
1547 self
, cmd
: _LttngLiveViewerGetMetadataStreamDataCommand
1549 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1550 logging
.info(fmt
.format(cmd
.stream_id
))
1551 metadata_stream_state
= self
._get
_metadata
_stream
_state
(cmd
.stream_id
)
1553 if metadata_stream_state
.is_sent
:
1554 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1555 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1557 metadata_stream_state
.is_sent
= True
1558 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1559 metadata_section
= metadata_stream_state
.cur_section
1560 assert metadata_section
is not None
1562 # If we are sending an empty section, ready the next one right away.
1563 if len(metadata_section
.data
) == 0:
1564 metadata_stream_state
.is_sent
= False
1565 metadata_stream_state
.goto_next_section()
1567 fmt
= 'Replying to "get metadata stream data" command: metadata-size={}'
1568 logging
.info(fmt
.format(len(metadata_section
.data
)))
1569 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1570 status
, metadata_section
.data
1573 def _handle_get_new_stream_infos_command(
1574 self
, cmd
: _LttngLiveViewerGetNewStreamInfosCommand
1576 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1577 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1579 # As of this version, all the tracing session's stream infos are
1580 # always given to the viewer when sending the "attach to tracing
1581 # session" reply, so there's nothing new here. Return the `HUP`
1582 # status as, if we're handling this command, the viewer consumed
1583 # all the existing data streams.
1584 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1585 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1587 def _handle_get_tracing_session_infos_command(
1588 self
, cmd
: _LttngLiveViewerGetTracingSessionInfosCommand
1590 logging
.info('Handling "get tracing session infos" command.')
1592 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1594 infos
.sort(key
=lambda info
: info
.name
)
1595 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1597 def _handle_create_viewer_session_command(
1598 self
, cmd
: _LttngLiveViewerCreateViewerSessionCommand
1600 logging
.info('Handling "create viewer session" command.')
1601 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1603 # This does nothing here. In the LTTng relay daemon, it
1604 # allocates the viewer session's state.
1605 return _LttngLiveViewerCreateViewerSessionReply(status
)
1608 # An LTTng live TCP server.
1610 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1611 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1612 # to a temporary port file. It renames the temporary port file to
1615 # `tracing_session_descriptors` is a list of tracing session descriptors
1616 # (`LttngTracingSessionDescriptor`) to serve.
1618 # This server accepts a single viewer (client).
1620 # When the viewer closes the connection, the server's constructor
1622 class LttngLiveServer
:
1625 port
: Optional
[int],
1626 port_filename
: Optional
[str],
1627 tracing_session_descriptors
: Iterable
[LttngTracingSessionDescriptor
],
1628 max_query_data_response_size
: Optional
[int],
1630 logging
.info("Server configuration:")
1632 logging
.info(" Port file name: `{}`".format(port_filename
))
1634 if max_query_data_response_size
is not None:
1636 " Maximum response data query size: `{}`".format(
1637 max_query_data_response_size
1641 for ts_descr
in tracing_session_descriptors
:
1642 info
= ts_descr
.info
1643 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1647 info
.tracing_session_id
,
1649 info
.live_timer_freq
,
1655 for trace
in ts_descr
.traces
:
1656 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1658 self
._ts
_descriptors
= tracing_session_descriptors
1659 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1660 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1661 self
._codec
= _LttngLiveViewerProtocolCodec()
1663 # Port 0: OS assigns an unused port
1664 serv_addr
= ("localhost", port
if port
is not None else 0)
1665 self
._sock
.bind(serv_addr
)
1667 if port_filename
is not None:
1668 self
._write
_port
_to
_file
(port_filename
)
1670 print("Listening on port {}".format(self
._server
_port
))
1672 for ts_descr
in tracing_session_descriptors
:
1673 info
= ts_descr
.info
1675 "net://localhost:{}/host/{}/{}".format(
1676 self
._server
_port
, info
.hostname
, info
.name
1684 logging
.info("Closed connection and socket.")
1687 def _server_port(self
):
1688 return self
._sock
.getsockname()[1]
1690 def _recv_command(self
):
1694 logging
.info("Waiting for viewer command.")
1695 buf
= self
._conn
.recv(128)
1698 logging
.info("Client closed connection.")
1702 "Client closed connection after having sent {} command bytes.".format(
1709 logging
.info("Received data from viewer: length={}".format(len(buf
)))
1714 cmd
= self
._codec
.decode(data
)
1715 except struct
.error
as exc
:
1716 raise RuntimeError("Malformed command: {}".format(exc
)) from exc
1720 "Received command from viewer: cmd-cls-name={}".format(
1721 cmd
.__class
__.__name
__
1726 def _send_reply(self
, reply
: _LttngLiveViewerReply
):
1727 data
= self
._codec
.encode(reply
)
1729 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1730 reply
.__class
__.__name
__, len(data
)
1733 self
._conn
.sendall(data
)
1735 def _handle_connection(self
):
1736 # First command must be "connect"
1737 cmd
= self
._recv
_command
()
1739 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1741 'First command is not "connect": cmd-cls-name={}'.format(
1742 cmd
.__class
__.__name
__
1746 # Create viewer session (arbitrary ID 23)
1748 "LTTng live viewer connected: version={}.{}".format(cmd
.major
, cmd
.minor
)
1750 viewer_session
= _LttngLiveViewerSession(
1751 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1754 # Send "connect" reply
1756 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1759 # Make the viewer session handle the remaining commands
1761 cmd
= self
._recv
_command
()
1764 # Connection closed (at an expected location within the
1768 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1771 logging
.info("Listening: port={}".format(self
._server
_port
))
1772 # Backlog must be present for Python version < 3.5.
1773 # 128 is an arbitrary number since we expect only 1 connection anyway.
1774 self
._sock
.listen(128)
1775 self
._conn
, viewer_addr
= self
._sock
.accept()
1777 "Accepted viewer: addr={}:{}".format(viewer_addr
[0], viewer_addr
[1])
1781 self
._handle
_connection
()
1785 def _write_port_to_file(self
, port_filename
: str):
1786 # Write the port number to a temporary file.
1787 with tempfile
.NamedTemporaryFile(
1788 mode
="w", delete
=False, dir=os
.path
.dirname(port_filename
)
1790 print(self
._server
_port
, end
="", file=tmp_port_file
)
1792 # Rename temporary file to real file.
1794 # For unknown reasons, on Windows, moving the port file from its
1795 # temporary location to its final location (where the user of
1796 # the server expects it to appear) may raise a `PermissionError`
1799 # We suppose it's possible that something in the Windows kernel
1800 # hasn't completely finished using the file when we try to move
1803 # Use a wait-and-retry scheme as a (bad) workaround.
1807 for attempt
in reversed(range(num_attempts
)):
1809 os
.replace(tmp_port_file
.name
, port_filename
)
1811 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1812 tmp_port_file
.name
, port_filename
1816 except PermissionError
:
1818 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1819 retry_delay_s
, tmp_port_file
.name
, port_filename
1826 time
.sleep(retry_delay_s
)
1829 def _session_descriptors_from_path(
1830 sessions_filename
: str, trace_path_prefix
: Optional
[str]
1836 # "name": "my-session",
1838 # "hostname": "myhost",
1839 # "live-timer-freq": 1000000,
1840 # "client-count": 23,
1846 # "path": "meow/mix",
1848 # "my_stream": [ 5235787, 728375283 ]
1850 # "metadata-sections": [
1860 with
open(sessions_filename
, "r") as sessions_file
:
1861 sessions_json
= tjson
.load(sessions_file
, tjson
.ArrayVal
)
1863 sessions
= [] # type: list[LttngTracingSessionDescriptor]
1865 for session_json
in sessions_json
.iter(tjson
.ObjVal
):
1866 name
= session_json
.at("name", tjson
.StrVal
).val
1867 tracing_session_id
= session_json
.at("id", tjson
.IntVal
).val
1868 hostname
= session_json
.at("hostname", tjson
.StrVal
).val
1869 live_timer_freq
= session_json
.at("live-timer-freq", tjson
.IntVal
).val
1870 client_count
= session_json
.at("client-count", tjson
.IntVal
).val
1871 traces_json
= session_json
.at("traces", tjson
.ArrayVal
)
1873 traces
= [] # type: list[LttngTrace]
1875 for trace_json
in traces_json
.iter(tjson
.ObjVal
):
1876 metadata_sections
= (
1877 trace_json
.at("metadata-sections", tjson
.ArrayVal
)
1878 if "metadata-sections" in trace_json
1882 trace_json
.at("beacons", tjson
.ObjVal
)
1883 if "beacons" in trace_json
1886 path
= trace_json
.at("path", tjson
.StrVal
).val
1888 if not os
.path
.isabs(path
) and trace_path_prefix
:
1889 path
= os
.path
.join(trace_path_prefix
, path
)
1900 LttngTracingSessionDescriptor(
1913 def _loglevel_parser(string
: str):
1914 loglevels
= {"info": logging
.INFO
, "warning": logging
.WARNING
}
1915 if string
not in loglevels
:
1916 msg
= "{} is not a valid loglevel".format(string
)
1917 raise argparse
.ArgumentTypeError(msg
)
1918 return loglevels
[string
]
1921 if __name__
== "__main__":
1922 logging
.basicConfig(format
="# %(asctime)-25s%(message)s")
1923 parser
= argparse
.ArgumentParser(
1924 description
="LTTng-live protocol mocker", add_help
=False
1926 parser
.add_argument(
1929 choices
=["info", "warning"],
1930 help="The loglevel to be used.",
1933 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1934 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1936 parser
.add_argument(
1938 help="The port to bind to. If missing, use an OS-assigned port..",
1941 parser
.add_argument(
1943 help="The final port file. This file is present when the server is ready to receive connection.",
1945 parser
.add_argument(
1946 "--max-query-data-response-size",
1948 help="The maximum size of control data response in bytes",
1950 parser
.add_argument(
1951 "--trace-path-prefix",
1953 help="Prefix to prepend to the trace paths of session configurations",
1955 parser
.add_argument(
1956 "sessions_filename",
1958 help="Path to a session configuration file",
1959 metavar
="sessions-filename",
1961 parser
.add_argument(
1965 default
=argparse
.SUPPRESS
,
1966 help="Show this help message and exit.",
1969 args
= parser
.parse_args(args
=remaining_args
)
1970 sessions_filename
= args
.sessions_filename
# type: str
1971 trace_path_prefix
= args
.trace_path_prefix
# type: str | None
1972 sessions
= _session_descriptors_from_path(
1977 port
= args
.port
# type: int | None
1978 port_filename
= args
.port_filename
# type: str | None
1979 max_query_data_response_size
= args
.max_query_data_response_size
# type: int | None
1980 LttngLiveServer(port
, port_filename
, sessions
, max_query_data_response_size
)