3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
7 from concurrent
.futures
import process
8 from . import lttngctl
, logger
, environment
11 from typing
import Callable
, Optional
, Type
, Union
17 Implementation of the lttngctl interface based on the `lttng` command line client.
21 class Unsupported(lttngctl
.ControlException
):
22 def __init__(self
, msg
):
27 def _get_domain_option_name(domain
):
28 # type: (lttngctl.TracingDomain) -> str
29 if domain
== lttngctl
.TracingDomain
.User
:
31 elif domain
== lttngctl
.TracingDomain
.Kernel
:
33 elif domain
== lttngctl
.TracingDomain
.Log4j
:
35 elif domain
== lttngctl
.TracingDomain
.JUL
:
37 elif domain
== lttngctl
.TracingDomain
.Python
:
40 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
43 def _get_context_type_name(context
):
44 # type: (lttngctl.ContextType) -> str
45 if isinstance(context
, lttngctl
.VgidContextType
):
47 elif isinstance(context
, lttngctl
.VuidContextType
):
49 elif isinstance(context
, lttngctl
.VpidContextType
):
51 elif isinstance(context
, lttngctl
.JavaApplicationContextType
):
52 return "$app.{retriever}:{field}".format(
53 retriever
=context
.retriever_name
, field
=context
.field_name
57 "Context `{context_name}` is not supported by the LTTng client".format(
58 type(context
).__name
__
63 class _Channel(lttngctl
.Channel
):
66 client
, # type: LTTngClient
68 domain
, # type: lttngctl.TracingDomain
69 session
, # type: _Session
71 self
._client
= client
# type: LTTngClient
72 self
._name
= name
# type: str
73 self
._domain
= domain
# type: lttngctl.TracingDomain
74 self
._session
= session
# type: _Session
76 def add_context(self
, context_type
):
77 # type: (lttngctl.ContextType) -> None
78 domain_option_name
= _get_domain_option_name(self
.domain
)
79 context_type_name
= _get_context_type_name(context_type
)
80 self
._client
._run
_cmd
(
81 "add-context --{domain_option_name} --type {context_type_name}".format(
82 domain_option_name
=domain_option_name
,
83 context_type_name
=context_type_name
,
87 def add_recording_rule(self
, rule
):
88 # type: (Type[lttngctl.EventRule]) -> None
90 "enable-event --session {session_name} --channel {channel_name}".format(
91 session_name
=self
._session
.name
, channel_name
=self
.name
94 if isinstance(rule
, lttngctl
.TracepointEventRule
):
95 domain_option_name
= (
97 if isinstance(rule
, lttngctl
.UserTracepointEventRule
)
100 client_args
= client_args
+ " --{domain_option_name}".format(
101 domain_option_name
=domain_option_name
104 if rule
.name_pattern
:
105 client_args
= client_args
+ " " + rule
.name_pattern
107 client_args
= client_args
+ " --all"
109 if rule
.filter_expression
:
110 client_args
= client_args
+ " " + rule
.filter_expression
112 if rule
.log_level_rule
:
113 if isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleAsSevereAs
):
114 client_args
= client_args
+ " --loglevel {log_level}".format(
115 log_level
=rule
.log_level_rule
.level
117 elif isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleExactly
):
118 client_args
= client_args
+ " --loglevel-only {log_level}".format(
119 log_level
=rule
.log_level_rule
.level
123 "Unsupported log level rule type `{log_level_rule_type}`".format(
124 log_level_rule_type
=type(rule
.log_level_rule
).__name
__
128 if rule
.name_pattern_exclusions
:
129 client_args
= client_args
+ " --exclude "
130 for idx
, pattern
in enumerate(rule
.name_pattern_exclusions
):
132 client_args
= client_args
+ ","
133 client_args
= client_args
+ pattern
136 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
137 event_rule_type
=type(rule
).__name
__
141 self
._client
._run
_cmd
(client_args
)
150 # type: () -> lttngctl.TracingDomain
155 class _ProcessAttribute(enum
.Enum
):
157 VPID
= "Virtual Process ID"
159 VUID
= "Virtual User ID"
161 VGID
= "Virtual Group ID"
164 return "<%s.%s>" % (self
.__class
__.__name
__, self
.name
)
167 def _get_process_attribute_option_name(attribute
):
168 # type: (_ProcessAttribute) -> str
170 _ProcessAttribute
.PID
: "pid",
171 _ProcessAttribute
.VPID
: "vpid",
172 _ProcessAttribute
.UID
: "uid",
173 _ProcessAttribute
.VUID
: "vuid",
174 _ProcessAttribute
.GID
: "gid",
175 _ProcessAttribute
.VGID
: "vgid",
179 class _ProcessAttributeTracker(lttngctl
.ProcessAttributeTracker
):
182 client
, # type: LTTngClient
183 attribute
, # type: _ProcessAttribute
184 domain
, # type: lttngctl.TracingDomain
185 session
, # type: _Session
187 self
._client
= client
# type: LTTngClient
188 self
._tracked
_attribute
= attribute
# type: _ProcessAttribute
189 self
._domain
= domain
# type: lttngctl.TracingDomain
190 self
._session
= session
# type: _Session
191 if attribute
== _ProcessAttribute
.PID
or attribute
== _ProcessAttribute
.VPID
:
192 self
._allowed
_value
_types
= [int, str] # type: list[type]
194 self
._allowed
_value
_types
= [int] # type: list[type]
196 def _call_client(self
, cmd_name
, value
):
197 # type: (str, Union[int, str]) -> None
198 if type(value
) not in self
._allowed
_value
_types
:
200 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
201 value_type
=type(value
).__name
__,
202 attribute_name
=self
._tracked
_attribute
.name
,
206 process_attribute_option_name
= _get_process_attribute_option_name(
207 self
._tracked
_attribute
209 domain_name
= _get_domain_option_name(self
._domain
)
210 self
._client
._run
_cmd
(
211 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
213 session_name
=self
._session
.name
,
214 domain_name
=domain_name
,
215 tracked_attribute_name
=process_attribute_option_name
,
220 def track(self
, value
):
221 # type: (Union[int, str]) -> None
222 self
._call
_client
("track", value
)
224 def untrack(self
, value
):
225 # type: (Union[int, str]) -> None
226 self
._call
_client
("untrack", value
)
229 class _Session(lttngctl
.Session
):
232 client
, # type: LTTngClient
234 output
, # type: Optional[lttngctl.SessionOutputLocation]
236 self
._client
= client
# type: LTTngClient
237 self
._name
= name
# type: str
238 self
._output
= output
# type: Optional[lttngctl.SessionOutputLocation]
245 def add_channel(self
, domain
, channel_name
=None):
246 # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
247 channel_name
= lttngctl
.Channel
._generate
_name
()
248 domain_option_name
= _get_domain_option_name(domain
)
249 self
._client
._run
_cmd
(
250 "enable-channel --{domain_name} {channel_name}".format(
251 domain_name
=domain_option_name
, channel_name
=channel_name
254 return _Channel(self
._client
, channel_name
, domain
, self
)
256 def add_context(self
, context_type
):
257 # type: (lttngctl.ContextType) -> None
262 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
263 return self
._output
# type: ignore
267 self
._client
._run
_cmd
("start {session_name}".format(session_name
=self
.name
))
271 self
._client
._run
_cmd
("stop {session_name}".format(session_name
=self
.name
))
275 self
._client
._run
_cmd
("destroy {session_name}".format(session_name
=self
.name
))
278 def kernel_pid_process_attribute_tracker(self
):
279 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
280 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.PID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
283 def kernel_vpid_process_attribute_tracker(self
):
284 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
285 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
288 def user_vpid_process_attribute_tracker(self
):
289 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
290 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
293 def kernel_gid_process_attribute_tracker(self
):
294 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
295 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.GID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
298 def kernel_vgid_process_attribute_tracker(self
):
299 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
300 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
303 def user_vgid_process_attribute_tracker(self
):
304 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
305 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
308 def kernel_uid_process_attribute_tracker(self
):
309 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
310 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.UID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
313 def kernel_vuid_process_attribute_tracker(self
):
314 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
315 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
318 def user_vuid_process_attribute_tracker(self
):
319 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
320 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
323 class LTTngClientError(lttngctl
.ControlException
):
326 command_args
, # type: str
327 error_output
, # type: str
329 self
._command
_args
= command_args
# type: str
330 self
._output
= error_output
# type: str
333 class LTTngClient(logger
._Logger
, lttngctl
.Controller
):
335 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
340 test_environment
, # type: environment._Environment
341 log
, # type: Optional[Callable[[str], None]]
343 logger
._Logger
.__init
__(self
, log
)
344 self
._environment
= test_environment
# type: environment._Environment
346 def _run_cmd(self
, command_args
):
347 # type: (str) -> None
349 Invoke the `lttng` client with a set of arguments. The command is
350 executed in the context of the client's test environment.
352 args
= [str(self
._environment
.lttng_client_path
)] # type: list[str]
353 args
.extend(shlex
.split(command_args
))
355 self
._log
("lttng {command_args}".format(command_args
=command_args
))
357 client_env
= os
.environ
.copy() # type: dict[str, str]
358 client_env
["LTTNG_HOME"] = str(self
._environment
.lttng_home_location
)
360 process
= subprocess
.Popen(
361 args
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, env
=client_env
364 out
= process
.communicate()[0]
366 if process
.returncode
!= 0:
367 decoded_output
= out
.decode("utf-8")
368 for error_line
in decoded_output
.splitlines():
369 self
._log
(error_line
)
370 raise LTTngClientError(command_args
, decoded_output
)
372 def create_session(self
, name
=None, output
=None):
373 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
374 name
= name
if name
else lttngctl
.Session
._generate
_name
()
376 if isinstance(output
, lttngctl
.LocalSessionOutputLocation
):
377 output_option
= "--output {output_path}".format(output_path
=output
.path
)
379 output_option
= "--no-output"
381 raise TypeError("LTTngClient only supports local or no output")
384 "create {session_name} {output_option}".format(
385 session_name
=name
, output_option
=output_option
388 return _Session(self
, name
, output
)