3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
8 from . import lttngctl
, logger
, environment
11 from typing
import Callable
, Optional
, Type
, Union
15 import xml
.etree
.ElementTree
18 Implementation of the lttngctl interface based on the `lttng` command line client.
22 class Unsupported(lttngctl
.ControlException
):
23 def __init__(self
, msg
):
28 class InvalidMI(lttngctl
.ControlException
):
29 def __init__(self
, msg
):
34 def _get_domain_option_name(domain
):
35 # type: (lttngctl.TracingDomain) -> str
36 if domain
== lttngctl
.TracingDomain
.User
:
38 elif domain
== lttngctl
.TracingDomain
.Kernel
:
40 elif domain
== lttngctl
.TracingDomain
.Log4j
:
42 elif domain
== lttngctl
.TracingDomain
.JUL
:
44 elif domain
== lttngctl
.TracingDomain
.Python
:
47 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
50 def _get_context_type_name(context
):
51 # type: (lttngctl.ContextType) -> str
52 if isinstance(context
, lttngctl
.VgidContextType
):
54 elif isinstance(context
, lttngctl
.VuidContextType
):
56 elif isinstance(context
, lttngctl
.VpidContextType
):
58 elif isinstance(context
, lttngctl
.JavaApplicationContextType
):
59 return "$app.{retriever}:{field}".format(
60 retriever
=context
.retriever_name
, field
=context
.field_name
64 "Context `{context_name}` is not supported by the LTTng client".format(
65 type(context
).__name
__
70 class _Channel(lttngctl
.Channel
):
73 client
, # type: LTTngClient
75 domain
, # type: lttngctl.TracingDomain
76 session
, # type: _Session
78 self
._client
= client
# type: LTTngClient
79 self
._name
= name
# type: str
80 self
._domain
= domain
# type: lttngctl.TracingDomain
81 self
._session
= session
# type: _Session
83 def add_context(self
, context_type
):
84 # type: (lttngctl.ContextType) -> None
85 domain_option_name
= _get_domain_option_name(self
.domain
)
86 context_type_name
= _get_context_type_name(context_type
)
87 self
._client
._run
_cmd
(
88 "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
89 domain_option_name
=domain_option_name
,
90 channel_name
=self
.name
,
91 context_type_name
=context_type_name
,
95 def add_recording_rule(self
, rule
):
96 # type: (Type[lttngctl.EventRule]) -> None
98 "enable-event --session {session_name} --channel {channel_name}".format(
99 session_name
=self
._session
.name
, channel_name
=self
.name
102 if isinstance(rule
, lttngctl
.TracepointEventRule
):
103 domain_option_name
= (
105 if isinstance(rule
, lttngctl
.UserTracepointEventRule
)
108 client_args
= client_args
+ " --{domain_option_name}".format(
109 domain_option_name
=domain_option_name
112 if rule
.name_pattern
:
113 client_args
= client_args
+ " " + rule
.name_pattern
115 client_args
= client_args
+ " --all"
117 if rule
.filter_expression
:
118 client_args
= client_args
+ " " + rule
.filter_expression
120 if rule
.log_level_rule
:
121 if isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleAsSevereAs
):
122 client_args
= client_args
+ " --loglevel {log_level}".format(
123 log_level
=rule
.log_level_rule
.level
125 elif isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleExactly
):
126 client_args
= client_args
+ " --loglevel-only {log_level}".format(
127 log_level
=rule
.log_level_rule
.level
131 "Unsupported log level rule type `{log_level_rule_type}`".format(
132 log_level_rule_type
=type(rule
.log_level_rule
).__name
__
136 if rule
.name_pattern_exclusions
:
137 client_args
= client_args
+ " --exclude "
138 for idx
, pattern
in enumerate(rule
.name_pattern_exclusions
):
140 client_args
= client_args
+ ","
141 client_args
= client_args
+ pattern
144 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
145 event_rule_type
=type(rule
).__name
__
149 self
._client
._run
_cmd
(client_args
)
158 # type: () -> lttngctl.TracingDomain
163 class _ProcessAttribute(enum
.Enum
):
165 VPID
= "Virtual Process ID"
167 VUID
= "Virtual User ID"
169 VGID
= "Virtual Group ID"
172 return "<%s.%s>" % (self
.__class
__.__name
__, self
.name
)
175 def _get_process_attribute_option_name(attribute
):
176 # type: (_ProcessAttribute) -> str
178 _ProcessAttribute
.PID
: "pid",
179 _ProcessAttribute
.VPID
: "vpid",
180 _ProcessAttribute
.UID
: "uid",
181 _ProcessAttribute
.VUID
: "vuid",
182 _ProcessAttribute
.GID
: "gid",
183 _ProcessAttribute
.VGID
: "vgid",
187 class _ProcessAttributeTracker(lttngctl
.ProcessAttributeTracker
):
190 client
, # type: LTTngClient
191 attribute
, # type: _ProcessAttribute
192 domain
, # type: lttngctl.TracingDomain
193 session
, # type: _Session
195 self
._client
= client
# type: LTTngClient
196 self
._tracked
_attribute
= attribute
# type: _ProcessAttribute
197 self
._domain
= domain
# type: lttngctl.TracingDomain
198 self
._session
= session
# type: _Session
199 if attribute
== _ProcessAttribute
.PID
or attribute
== _ProcessAttribute
.VPID
:
200 self
._allowed
_value
_types
= [int, str] # type: list[type]
202 self
._allowed
_value
_types
= [int] # type: list[type]
204 def _call_client(self
, cmd_name
, value
):
205 # type: (str, Union[int, str]) -> None
206 if type(value
) not in self
._allowed
_value
_types
:
208 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
209 value_type
=type(value
).__name
__,
210 attribute_name
=self
._tracked
_attribute
.name
,
214 process_attribute_option_name
= _get_process_attribute_option_name(
215 self
._tracked
_attribute
217 domain_name
= _get_domain_option_name(self
._domain
)
218 self
._client
._run
_cmd
(
219 "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
221 session_name
=self
._session
.name
,
222 domain_name
=domain_name
,
223 tracked_attribute_name
=process_attribute_option_name
,
228 def track(self
, value
):
229 # type: (Union[int, str]) -> None
230 self
._call
_client
("track", value
)
232 def untrack(self
, value
):
233 # type: (Union[int, str]) -> None
234 self
._call
_client
("untrack", value
)
237 class _Session(lttngctl
.Session
):
240 client
, # type: LTTngClient
242 output
, # type: Optional[lttngctl.SessionOutputLocation]
244 self
._client
= client
# type: LTTngClient
245 self
._name
= name
# type: str
246 self
._output
= output
# type: Optional[lttngctl.SessionOutputLocation]
257 buffer_sharing_policy
=lttngctl
.BufferSharingPolicy
.PerUID
,
259 # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel
260 channel_name
= lttngctl
.Channel
._generate
_name
()
261 domain_option_name
= _get_domain_option_name(domain
)
262 self
._client
._run
_cmd
(
263 "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format(
264 session_name
=self
.name
,
265 domain_name
=domain_option_name
,
266 channel_name
=channel_name
,
267 buffer_sharing_policy
="--buffers-uid"
268 if buffer_sharing_policy
== lttngctl
.BufferSharingPolicy
.PerUID
269 else "--buffers-pid",
272 return _Channel(self
._client
, channel_name
, domain
, self
)
274 def add_context(self
, context_type
):
275 # type: (lttngctl.ContextType) -> None
280 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
281 return self
._output
# type: ignore
285 self
._client
._run
_cmd
("start '{session_name}'".format(session_name
=self
.name
))
289 self
._client
._run
_cmd
("stop '{session_name}'".format(session_name
=self
.name
))
293 self
._client
._run
_cmd
("destroy '{session_name}'".format(session_name
=self
.name
))
298 list_session_xml
= self
._client
._run
_cmd
(
299 "list '{session_name}'".format(session_name
=self
.name
),
300 LTTngClient
.CommandOutputFormat
.MI_XML
,
303 root
= xml
.etree
.ElementTree
.fromstring(list_session_xml
)
304 command_output
= LTTngClient
._mi
_find
_in
_element
(root
, "output")
305 sessions
= LTTngClient
._mi
_find
_in
_element
(command_output
, "sessions")
306 session_mi
= LTTngClient
._mi
_find
_in
_element
(sessions
, "session")
308 enabled_text
= LTTngClient
._mi
_find
_in
_element
(session_mi
, "enabled").text
309 if enabled_text
not in ["true", "false"]:
311 "Expected boolean value in element '{}': value='{}'".format(
312 session_mi
.tag
, enabled_text
316 return enabled_text
== "true"
319 def kernel_pid_process_attribute_tracker(self
):
320 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
321 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.PID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
324 def kernel_vpid_process_attribute_tracker(self
):
325 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
326 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
329 def user_vpid_process_attribute_tracker(self
):
330 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
331 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
334 def kernel_gid_process_attribute_tracker(self
):
335 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
336 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.GID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
339 def kernel_vgid_process_attribute_tracker(self
):
340 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
341 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
344 def user_vgid_process_attribute_tracker(self
):
345 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
346 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
349 def kernel_uid_process_attribute_tracker(self
):
350 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
351 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.UID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
354 def kernel_vuid_process_attribute_tracker(self
):
355 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
356 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
359 def user_vuid_process_attribute_tracker(self
):
360 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
361 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
364 class LTTngClientError(lttngctl
.ControlException
):
367 command_args
, # type: str
368 error_output
, # type: str
370 self
._command
_args
= command_args
# type: str
371 self
._output
= error_output
# type: str
374 class LTTngClient(logger
._Logger
, lttngctl
.Controller
):
376 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
379 class CommandOutputFormat(enum
.Enum
):
383 _MI_NS
= "{https://lttng.org/xml/ns/lttng-mi}"
387 test_environment
, # type: environment._Environment
388 log
, # type: Optional[Callable[[str], None]]
390 logger
._Logger
.__init
__(self
, log
)
391 self
._environment
= test_environment
# type: environment._Environment
394 def _namespaced_mi_element(property):
396 return LTTngClient
._MI
_NS
+ property
398 def _run_cmd(self
, command_args
, output_format
=CommandOutputFormat
.MI_XML
):
399 # type: (str, CommandOutputFormat) -> str
401 Invoke the `lttng` client with a set of arguments. The command is
402 executed in the context of the client's test environment.
404 args
= [str(self
._environment
.lttng_client_path
)] # type: list[str]
405 if output_format
== LTTngClient
.CommandOutputFormat
.MI_XML
:
406 args
.extend(["--mi", "xml"])
408 args
.extend(shlex
.split(command_args
))
410 self
._log
("lttng {command_args}".format(command_args
=command_args
))
412 client_env
= os
.environ
.copy() # type: dict[str, str]
413 client_env
["LTTNG_HOME"] = str(self
._environment
.lttng_home_location
)
415 process
= subprocess
.Popen(
416 args
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, env
=client_env
419 out
= process
.communicate()[0]
421 if process
.returncode
!= 0:
422 decoded_output
= out
.decode("utf-8")
423 for error_line
in decoded_output
.splitlines():
424 self
._log
(error_line
)
425 raise LTTngClientError(command_args
, decoded_output
)
427 return out
.decode("utf-8")
429 def create_session(self
, name
=None, output
=None):
430 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
431 name
= name
if name
else lttngctl
.Session
._generate
_name
()
433 if isinstance(output
, lttngctl
.LocalSessionOutputLocation
):
434 output_option
= "--output {output_path}".format(output_path
=output
.path
)
436 output_option
= "--no-output"
438 raise TypeError("LTTngClient only supports local or no output")
441 "create '{session_name}' {output_option}".format(
442 session_name
=name
, output_option
=output_option
445 return _Session(self
, name
, output
)
447 def start_session_by_name(self
, name
):
448 # type: (str) -> None
449 self
._run
_cmd
("start '{session_name}'".format(session_name
=name
))
451 def start_session_by_glob_pattern(self
, pattern
):
452 # type: (str) -> None
453 self
._run
_cmd
("start --glob '{pattern}'".format(pattern
=pattern
))
455 def start_sessions_all(self
):
457 self
._run
_cmd
("start --all")
459 def stop_session_by_name(self
, name
):
460 # type: (str) -> None
461 self
._run
_cmd
("stop '{session_name}'".format(session_name
=name
))
463 def stop_session_by_glob_pattern(self
, pattern
):
464 # type: (str) -> None
465 self
._run
_cmd
("stop --glob '{pattern}'".format(pattern
=pattern
))
467 def stop_sessions_all(self
):
469 self
._run
_cmd
("stop --all")
471 def destroy_session_by_name(self
, name
):
472 # type: (str) -> None
473 self
._run
_cmd
("destroy '{session_name}'".format(session_name
=name
))
475 def destroy_session_by_glob_pattern(self
, pattern
):
476 # type: (str) -> None
477 self
._run
_cmd
("destroy --glob '{pattern}'".format(pattern
=pattern
))
479 def destroy_sessions_all(self
):
481 self
._run
_cmd
("destroy --all")
484 def _mi_find_in_element(element
, sub_element_name
):
485 # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
486 result
= element
.find(LTTngClient
._namespaced
_mi
_element
(sub_element_name
))
489 "Failed to find element '{}' within command MI element '{}'".format(
490 element
.tag
, sub_element_name
496 def list_sessions(self
):
497 # type () -> List[Session]
498 list_sessions_xml
= self
._run
_cmd
(
499 "list", LTTngClient
.CommandOutputFormat
.MI_XML
502 root
= xml
.etree
.ElementTree
.fromstring(list_sessions_xml
)
503 command_output
= self
._mi
_find
_in
_element
(root
, "output")
504 sessions
= self
._mi
_find
_in
_element
(command_output
, "sessions")
506 ctl_sessions
= [] # type: list[lttngctl.Session]
508 for session_mi
in sessions
:
509 name
= self
._mi
_find
_in
_element
(session_mi
, "name").text
510 path
= self
._mi
_find
_in
_element
(session_mi
, "path").text
514 "Invalid empty 'name' element in '{}'".format(session_mi
.tag
)
518 "Invalid empty 'path' element in '{}'".format(session_mi
.tag
)
520 if not path
.startswith("/"):
522 "{} does not support non-local session outputs".format(type(self
))
526 _Session(self
, name
, lttngctl
.LocalSessionOutputLocation(path
))