3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
8 from . import lttngctl
, logger
, environment
11 from typing
import Callable
, Optional
, Type
, Union
15 import xml
.etree
.ElementTree
18 Implementation of the lttngctl interface based on the `lttng` command line client.
22 class Unsupported(lttngctl
.ControlException
):
23 def __init__(self
, msg
):
28 class InvalidMI(lttngctl
.ControlException
):
29 def __init__(self
, msg
):
34 def _get_domain_option_name(domain
):
35 # type: (lttngctl.TracingDomain) -> str
36 if domain
== lttngctl
.TracingDomain
.User
:
38 elif domain
== lttngctl
.TracingDomain
.Kernel
:
40 elif domain
== lttngctl
.TracingDomain
.Log4j
:
42 elif domain
== lttngctl
.TracingDomain
.JUL
:
44 elif domain
== lttngctl
.TracingDomain
.Python
:
47 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
50 def _get_context_type_name(context
):
51 # type: (lttngctl.ContextType) -> str
52 if isinstance(context
, lttngctl
.VgidContextType
):
54 elif isinstance(context
, lttngctl
.VuidContextType
):
56 elif isinstance(context
, lttngctl
.VpidContextType
):
58 elif isinstance(context
, lttngctl
.JavaApplicationContextType
):
59 return "$app.{retriever}:{field}".format(
60 retriever
=context
.retriever_name
, field
=context
.field_name
64 "Context `{context_name}` is not supported by the LTTng client".format(
65 type(context
).__name
__
70 class _Channel(lttngctl
.Channel
):
73 client
, # type: LTTngClient
75 domain
, # type: lttngctl.TracingDomain
76 session
, # type: _Session
78 self
._client
= client
# type: LTTngClient
79 self
._name
= name
# type: str
80 self
._domain
= domain
# type: lttngctl.TracingDomain
81 self
._session
= session
# type: _Session
83 def add_context(self
, context_type
):
84 # type: (lttngctl.ContextType) -> None
85 domain_option_name
= _get_domain_option_name(self
.domain
)
86 context_type_name
= _get_context_type_name(context_type
)
87 self
._client
._run
_cmd
(
88 "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
89 domain_option_name
=domain_option_name
,
90 channel_name
=self
.name
,
91 context_type_name
=context_type_name
,
95 def add_recording_rule(self
, rule
):
96 # type: (Type[lttngctl.EventRule]) -> None
98 "enable-event --session {session_name} --channel {channel_name}".format(
99 session_name
=self
._session
.name
, channel_name
=self
.name
102 if isinstance(rule
, lttngctl
.TracepointEventRule
):
103 domain_option_name
= (
105 if isinstance(rule
, lttngctl
.UserTracepointEventRule
)
108 client_args
= client_args
+ " --{domain_option_name}".format(
109 domain_option_name
=domain_option_name
112 if rule
.name_pattern
:
113 client_args
= client_args
+ " " + rule
.name_pattern
115 client_args
= client_args
+ " --all"
117 if rule
.filter_expression
:
118 client_args
= client_args
+ " " + rule
.filter_expression
120 if rule
.log_level_rule
:
121 if isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleAsSevereAs
):
122 client_args
= client_args
+ " --loglevel {log_level}".format(
123 log_level
=rule
.log_level_rule
.level
125 elif isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleExactly
):
126 client_args
= client_args
+ " --loglevel-only {log_level}".format(
127 log_level
=rule
.log_level_rule
.level
131 "Unsupported log level rule type `{log_level_rule_type}`".format(
132 log_level_rule_type
=type(rule
.log_level_rule
).__name
__
136 if rule
.name_pattern_exclusions
:
137 client_args
= client_args
+ " --exclude "
138 for idx
, pattern
in enumerate(rule
.name_pattern_exclusions
):
140 client_args
= client_args
+ ","
141 client_args
= client_args
+ pattern
144 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
145 event_rule_type
=type(rule
).__name
__
149 self
._client
._run
_cmd
(client_args
)
158 # type: () -> lttngctl.TracingDomain
163 class _ProcessAttribute(enum
.Enum
):
165 VPID
= "Virtual Process ID"
167 VUID
= "Virtual User ID"
169 VGID
= "Virtual Group ID"
172 return "<%s.%s>" % (self
.__class
__.__name
__, self
.name
)
175 def _get_process_attribute_option_name(attribute
):
176 # type: (_ProcessAttribute) -> str
178 _ProcessAttribute
.PID
: "pid",
179 _ProcessAttribute
.VPID
: "vpid",
180 _ProcessAttribute
.UID
: "uid",
181 _ProcessAttribute
.VUID
: "vuid",
182 _ProcessAttribute
.GID
: "gid",
183 _ProcessAttribute
.VGID
: "vgid",
187 class _ProcessAttributeTracker(lttngctl
.ProcessAttributeTracker
):
190 client
, # type: LTTngClient
191 attribute
, # type: _ProcessAttribute
192 domain
, # type: lttngctl.TracingDomain
193 session
, # type: _Session
195 self
._client
= client
# type: LTTngClient
196 self
._tracked
_attribute
= attribute
# type: _ProcessAttribute
197 self
._domain
= domain
# type: lttngctl.TracingDomain
198 self
._session
= session
# type: _Session
199 if attribute
== _ProcessAttribute
.PID
or attribute
== _ProcessAttribute
.VPID
:
200 self
._allowed
_value
_types
= [int, str] # type: list[type]
202 self
._allowed
_value
_types
= [int] # type: list[type]
204 def _call_client(self
, cmd_name
, value
):
205 # type: (str, Union[int, str]) -> None
206 if type(value
) not in self
._allowed
_value
_types
:
208 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
209 value_type
=type(value
).__name
__,
210 attribute_name
=self
._tracked
_attribute
.name
,
214 process_attribute_option_name
= _get_process_attribute_option_name(
215 self
._tracked
_attribute
217 domain_name
= _get_domain_option_name(self
._domain
)
218 self
._client
._run
_cmd
(
219 "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
221 session_name
=self
._session
.name
,
222 domain_name
=domain_name
,
223 tracked_attribute_name
=process_attribute_option_name
,
228 def track(self
, value
):
229 # type: (Union[int, str]) -> None
230 self
._call
_client
("track", value
)
232 def untrack(self
, value
):
233 # type: (Union[int, str]) -> None
234 self
._call
_client
("untrack", value
)
237 class _Session(lttngctl
.Session
):
240 client
, # type: LTTngClient
242 output
, # type: Optional[lttngctl.SessionOutputLocation]
244 self
._client
= client
# type: LTTngClient
245 self
._name
= name
# type: str
246 self
._output
= output
# type: Optional[lttngctl.SessionOutputLocation]
253 def add_channel(self
, domain
, channel_name
=None):
254 # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
255 channel_name
= lttngctl
.Channel
._generate
_name
()
256 domain_option_name
= _get_domain_option_name(domain
)
257 self
._client
._run
_cmd
(
258 "enable-channel --session '{session_name}' --{domain_name} '{channel_name}'".format(
259 session_name
=self
.name
,
260 domain_name
=domain_option_name
,
261 channel_name
=channel_name
,
264 return _Channel(self
._client
, channel_name
, domain
, self
)
266 def add_context(self
, context_type
):
267 # type: (lttngctl.ContextType) -> None
272 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
273 return self
._output
# type: ignore
277 self
._client
._run
_cmd
("start '{session_name}'".format(session_name
=self
.name
))
281 self
._client
._run
_cmd
("stop '{session_name}'".format(session_name
=self
.name
))
285 self
._client
._run
_cmd
("destroy '{session_name}'".format(session_name
=self
.name
))
290 list_session_xml
= self
._client
._run
_cmd
(
291 "list '{session_name}'".format(session_name
=self
.name
),
292 LTTngClient
.CommandOutputFormat
.MI_XML
,
295 root
= xml
.etree
.ElementTree
.fromstring(list_session_xml
)
296 command_output
= LTTngClient
._mi
_find
_in
_element
(root
, "output")
297 sessions
= LTTngClient
._mi
_find
_in
_element
(command_output
, "sessions")
298 session_mi
= LTTngClient
._mi
_find
_in
_element
(sessions
, "session")
300 enabled_text
= LTTngClient
._mi
_find
_in
_element
(session_mi
, "enabled").text
301 if enabled_text
not in ["true", "false"]:
303 "Expected boolean value in element '{}': value='{}'".format(
304 session_mi
.tag
, enabled_text
308 return enabled_text
== "true"
311 def kernel_pid_process_attribute_tracker(self
):
312 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
313 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.PID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
316 def kernel_vpid_process_attribute_tracker(self
):
317 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
318 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
321 def user_vpid_process_attribute_tracker(self
):
322 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
323 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
326 def kernel_gid_process_attribute_tracker(self
):
327 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
328 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.GID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
331 def kernel_vgid_process_attribute_tracker(self
):
332 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
333 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
336 def user_vgid_process_attribute_tracker(self
):
337 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
338 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
341 def kernel_uid_process_attribute_tracker(self
):
342 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
343 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.UID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
346 def kernel_vuid_process_attribute_tracker(self
):
347 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
348 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
351 def user_vuid_process_attribute_tracker(self
):
352 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
353 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
356 class LTTngClientError(lttngctl
.ControlException
):
359 command_args
, # type: str
360 error_output
, # type: str
362 self
._command
_args
= command_args
# type: str
363 self
._output
= error_output
# type: str
366 class LTTngClient(logger
._Logger
, lttngctl
.Controller
):
368 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
371 class CommandOutputFormat(enum
.Enum
):
375 _MI_NS
= "{https://lttng.org/xml/ns/lttng-mi}"
379 test_environment
, # type: environment._Environment
380 log
, # type: Optional[Callable[[str], None]]
382 logger
._Logger
.__init
__(self
, log
)
383 self
._environment
= test_environment
# type: environment._Environment
386 def _namespaced_mi_element(property):
388 return LTTngClient
._MI
_NS
+ property
390 def _run_cmd(self
, command_args
, output_format
=CommandOutputFormat
.MI_XML
):
391 # type: (str, CommandOutputFormat) -> str
393 Invoke the `lttng` client with a set of arguments. The command is
394 executed in the context of the client's test environment.
396 args
= [str(self
._environment
.lttng_client_path
)] # type: list[str]
397 if output_format
== LTTngClient
.CommandOutputFormat
.MI_XML
:
398 args
.extend(["--mi", "xml"])
400 args
.extend(shlex
.split(command_args
))
402 self
._log
("lttng {command_args}".format(command_args
=command_args
))
404 client_env
= os
.environ
.copy() # type: dict[str, str]
405 client_env
["LTTNG_HOME"] = str(self
._environment
.lttng_home_location
)
407 process
= subprocess
.Popen(
408 args
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, env
=client_env
411 out
= process
.communicate()[0]
413 if process
.returncode
!= 0:
414 decoded_output
= out
.decode("utf-8")
415 for error_line
in decoded_output
.splitlines():
416 self
._log
(error_line
)
417 raise LTTngClientError(command_args
, decoded_output
)
419 return out
.decode("utf-8")
421 def create_session(self
, name
=None, output
=None):
422 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
423 name
= name
if name
else lttngctl
.Session
._generate
_name
()
425 if isinstance(output
, lttngctl
.LocalSessionOutputLocation
):
426 output_option
= "--output {output_path}".format(output_path
=output
.path
)
428 output_option
= "--no-output"
430 raise TypeError("LTTngClient only supports local or no output")
433 "create '{session_name}' {output_option}".format(
434 session_name
=name
, output_option
=output_option
437 return _Session(self
, name
, output
)
439 def start_session_by_name(self
, name
):
440 # type: (str) -> None
441 self
._run
_cmd
("start '{session_name}'".format(session_name
=name
))
443 def start_session_by_glob_pattern(self
, pattern
):
444 # type: (str) -> None
445 self
._run
_cmd
("start --glob '{pattern}'".format(pattern
=pattern
))
447 def start_sessions_all(self
):
449 self
._run
_cmd
("start --all")
451 def stop_session_by_name(self
, name
):
452 # type: (str) -> None
453 self
._run
_cmd
("stop '{session_name}'".format(session_name
=name
))
455 def stop_session_by_glob_pattern(self
, pattern
):
456 # type: (str) -> None
457 self
._run
_cmd
("stop --glob '{pattern}'".format(pattern
=pattern
))
459 def stop_sessions_all(self
):
461 self
._run
_cmd
("stop --all")
463 def destroy_session_by_name(self
, name
):
464 # type: (str) -> None
465 self
._run
_cmd
("destroy '{session_name}'".format(session_name
=name
))
467 def destroy_session_by_glob_pattern(self
, pattern
):
468 # type: (str) -> None
469 self
._run
_cmd
("destroy --glob '{pattern}'".format(pattern
=pattern
))
471 def destroy_sessions_all(self
):
473 self
._run
_cmd
("destroy --all")
476 def _mi_find_in_element(element
, sub_element_name
):
477 # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
478 result
= element
.find(LTTngClient
._namespaced
_mi
_element
(sub_element_name
))
481 "Failed to find element '{}' within command MI element '{}'".format(
482 element
.tag
, sub_element_name
488 def list_sessions(self
):
489 # type () -> List[Session]
490 list_sessions_xml
= self
._run
_cmd
(
491 "list", LTTngClient
.CommandOutputFormat
.MI_XML
494 root
= xml
.etree
.ElementTree
.fromstring(list_sessions_xml
)
495 command_output
= self
._mi
_find
_in
_element
(root
, "output")
496 sessions
= self
._mi
_find
_in
_element
(command_output
, "sessions")
498 ctl_sessions
= [] # type: list[lttngctl.Session]
500 for session_mi
in sessions
:
501 name
= self
._mi
_find
_in
_element
(session_mi
, "name").text
502 path
= self
._mi
_find
_in
_element
(session_mi
, "path").text
506 "Invalid empty 'name' element in '{}'".format(session_mi
.tag
)
510 "Invalid empty 'path' element in '{}'".format(session_mi
.tag
)
512 if not path
.startswith("/"):
514 "{} does not support non-local session outputs".format(type(self
))
518 _Session(self
, name
, lttngctl
.LocalSessionOutputLocation(path
))