--- /dev/null
+#!/usr/bin/env python3
+#
+# Copyright (C) 2023 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+#
+# SPDX-License-Identifier: GPL-2.0-only
+
+from cgi import test
+import pathlib
+import sys
+import os
+from typing import Any, Callable, Type, Dict, Iterator
+import random
+import string
+from collections.abc import Mapping
+
+"""
+Test the session commands of the `lttng` CLI client.
+"""
+
+# Import in-tree test utils
+test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
+sys.path.append(str(test_utils_import_path))
+
+import lttngtest
+import bt2
+
+
+class SessionSet(Mapping):
+ def __init__(self, client, name_prefixes):
+ self._sessions = {} # type dict[str, lttngtest.Session]
+ for prefix in name_prefixes:
+ new_session = client.create_session(
+ name=self._generate_session_name_from_prefix(prefix),
+ output=lttngtest.LocalSessionOutputLocation(
+ test_env.create_temporary_directory("trace")
+ ),
+ )
+ # Add a channel to all sessions to ensure the sessions can be started.
+ new_session.add_channel(lttngtest.TracingDomain.User)
+ self._sessions[prefix] = new_session
+
+ @staticmethod
+ def _generate_session_name_from_prefix(prefix):
+ # type: (str) -> str
+ return (
+ prefix
+ + "_"
+ + "".join(
+ random.choice(string.ascii_lowercase + string.digits) for _ in range(8)
+ )
+ )
+
+ def __getitem__(self, __key):
+ # type: (str) -> lttngtest.Session
+ return self._sessions[__key]
+
+ def __len__(self):
+ # type: () -> int
+ return len(self._sessions)
+
+ def __iter__(self):
+ # type: () -> Iterator[str]
+ return iter(self._sessions)
+
+
+def test_start_globbing(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test --glob match of start command")
+ name_prefixes = ["abba", "alakazou", "alakazam"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test globbing")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ tap.test(
+ all(not session.is_active for prefix, session in sessions.items()),
+ "All sessions created are in the inactive state",
+ )
+
+ start_pattern = "alak*"
+ with tap.case("Start sessions with --glob={}".format(start_pattern)) as test_case:
+ client.start_session_by_glob_pattern(start_pattern)
+
+ tap.test(
+ sessions["alakazou"].is_active
+ and sessions["alakazam"].is_active
+ and not sessions["abba"].is_active,
+ "Only sessions 'alakazou' and 'alakazam' are active",
+ )
+
+ with tap.case(
+ "Starting already started sessions with --glob={} doesn't produce an error".format(
+ start_pattern
+ )
+ ) as test_case:
+ client.start_session_by_glob_pattern(start_pattern)
+
+ start_pattern = "tintina*"
+ with tap.case(
+ "Starting with --glob={} that doesn't match any session doesn't produce an error".format(
+ start_pattern
+ )
+ ) as test_case:
+ client.start_session_by_glob_pattern(start_pattern)
+
+ for name, session in sessions.items():
+ session.destroy()
+
+ with tap.case(
+ "Starting with --glob={} when no sessions exist doesn't produce an error".format(
+ start_pattern
+ )
+ ) as test_case:
+ client.start_session_by_glob_pattern(start_pattern)
+
+
+def test_start_single(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test match of start command targeting a single session")
+ name_prefixes = ["un", "deux", "patate", "pouel"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test single session start")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ tap.test(
+ all(not session.is_active for prefix, session in sessions.items()),
+ "All sessions created are in the inactive state",
+ )
+
+ session_to_start_prefix = "patate"
+ full_session_name = sessions[session_to_start_prefix].name
+ with tap.case("Start session '{}'".format(session_to_start_prefix)) as test_case:
+ client.start_session_by_name(full_session_name)
+
+ tap.test(
+ any(
+ session.is_active and prefix != session_to_start_prefix
+ for prefix, session in sessions.items()
+ )
+ is False,
+ "Only session '{}' is active".format(session_to_start_prefix),
+ )
+
+ with tap.case(
+ "Starting already started session '{}' doesn't produce an error".format(
+ session_to_start_prefix
+ )
+ ) as test_case:
+ client.start_session_by_name(full_session_name)
+
+ for name, session in sessions.items():
+ session.destroy()
+
+
+def test_start_all(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test start command with the --all option")
+ name_prefixes = ["a", "b", "c", "d"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test starting all sessions")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ tap.test(
+ all(not session.is_active for prefix, session in sessions.items()),
+ "All sessions created are in the inactive state",
+ )
+
+ with tap.case("Start all sessions") as test_case:
+ client.start_sessions_all()
+
+ tap.test(
+ all(session.is_active for prefix, session in sessions.items()),
+ "All sessions are active",
+ )
+
+ with tap.case("Starting already started sessions") as test_case:
+ client.start_sessions_all()
+
+ for name, session in sessions.items():
+ session.destroy()
+
+ with tap.case(
+ "Starting all sessions when none exist doesn't produce an error"
+ ) as test_case:
+ client.start_sessions_all()
+
+
+def test_stop_globbing(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test --glob match of stop command")
+ name_prefixes = ["East Farnham", "Amqui", "Amos"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test globbing")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ client.start_sessions_all()
+ tap.test(
+ all(session.is_active for prefix, session in sessions.items()),
+ "All sessions are in the active state",
+ )
+
+ stop_pattern = "Am??i*"
+ with tap.case("Stop sessions with --glob={}".format(stop_pattern)) as test_case:
+ client.stop_session_by_glob_pattern(stop_pattern)
+
+ tap.test(
+ (
+ sessions["East Farnham"].is_active
+ and sessions["Amos"].is_active
+ and (not sessions["Amqui"].is_active)
+ ),
+ "Only session 'Amqui' is inactive",
+ )
+
+ stop_pattern = "Am*"
+ with tap.case(
+ "Stopping more sessions, including a stopped session, with --glob={} doesn't produce an error".format(
+ stop_pattern
+ )
+ ) as test_case:
+ client.stop_session_by_glob_pattern(stop_pattern)
+
+ tap.test(
+ sessions["East Farnham"].is_active
+ and (not sessions["Amqui"].is_active)
+ and (not sessions["Amos"].is_active),
+ "Only session 'East Farnham' is active",
+ )
+
+ stop_pattern = "Notre-Dame*"
+ with tap.case(
+ "Stopping with --glob={} that doesn't match any session doesn't produce an error".format(
+ stop_pattern
+ )
+ ) as test_case:
+ client.stop_session_by_glob_pattern(stop_pattern)
+
+ for name, session in sessions.items():
+ session.destroy()
+
+ with tap.case(
+ "Stopping with --glob={} when no sessions exist doesn't produce an error".format(
+ stop_pattern
+ )
+ ) as test_case:
+ client.stop_session_by_glob_pattern(stop_pattern)
+
+
+def test_stop_single(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test match of stop command targeting a single session")
+ name_prefixes = ["Grosses-Roches", "Kazabazua", "Laval", "Magog"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test single session stop")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ client.start_sessions_all()
+ tap.test(
+ all(session.is_active for prefix, session in sessions.items()),
+ "All sessions are in the active state",
+ )
+
+ session_to_stop_prefix = "Kazabazua"
+ full_session_name = sessions[session_to_stop_prefix].name
+ with tap.case("Stop session '{}'".format(session_to_stop_prefix)) as test_case:
+ client.stop_session_by_name(full_session_name)
+
+ inactive_session_prefixes = [
+ prefix for prefix, session in sessions.items() if not session.is_active
+ ]
+ tap.test(
+ len(inactive_session_prefixes) == 1
+ and inactive_session_prefixes[0] == session_to_stop_prefix,
+ "Only session '{}' is inactive".format(session_to_stop_prefix),
+ )
+
+ with tap.case(
+ "Stopping already stopped session '{}' doesn't produce an error".format(
+ session_to_stop_prefix
+ )
+ ) as test_case:
+ client.stop_session_by_name(full_session_name)
+
+ for name, session in sessions.items():
+ session.destroy()
+
+
+def test_stop_all(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test stop command with the --all option")
+ name_prefixes = ["a", "b", "c", "d"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test stopping all sessions")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ client.start_sessions_all()
+ tap.test(
+ all(session.is_active for prefix, session in sessions.items()),
+ "All sessions are in the active state",
+ )
+
+ with tap.case("Stop all sessions") as test_case:
+ client.stop_sessions_all()
+
+ tap.test(
+ all(not session.is_active for prefix, session in sessions.items()),
+ "All sessions are inactive",
+ )
+
+ with tap.case("Stopping already stopped sessions") as test_case:
+ client.stop_sessions_all()
+
+ for name, session in sessions.items():
+ session.destroy()
+
+ with tap.case(
+ "Stopping all sessions when none exist doesn't produce an error"
+ ) as test_case:
+ client.stop_sessions_all()
+
+
+def test_destroy_globbing(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test --glob match of destroy command")
+ name_prefixes = ["Mont-Laurier", "Montreal", "Montmagny", "Neuville"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test globbing")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ destroy_pattern = "Mont*"
+ with tap.case(
+ "Destroy sessions with --glob={}".format(destroy_pattern)
+ ) as test_case:
+ client.destroy_session_by_glob_pattern(destroy_pattern)
+
+ listed_sessions = client.list_sessions()
+ tap.test(
+ len(listed_sessions) == 1
+ and listed_sessions[0].name == sessions["Neuville"].name,
+ "Neuville is the only remaining session",
+ )
+
+ for session in listed_sessions:
+ session.destroy()
+
+ with tap.case(
+ "Destroying with --glob={} when no sessions exist doesn't produce an error".format(
+ destroy_pattern
+ )
+ ) as test_case:
+ client.destroy_session_by_glob_pattern(destroy_pattern)
+
+
+def test_destroy_single(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test match of destroy command targeting a single session")
+ name_prefixes = ["Natashquan", "Normetal", "Notre-Dame-des-Sept-Douleurs"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test single session destruction")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ session_to_destroy_prefix = "Normetal"
+ full_session_name = sessions[session_to_destroy_prefix].name
+ with tap.case(
+ "Destroy session '{}'".format(session_to_destroy_prefix)
+ ) as test_case:
+ client.destroy_session_by_name(full_session_name)
+
+ listed_sessions = client.list_sessions()
+ tap.test(
+ len(listed_sessions) == 2
+ and full_session_name not in [session.name for session in listed_sessions],
+ "Session '{}' no longer exists".format(session_to_destroy_prefix),
+ )
+
+ for session in listed_sessions:
+ session.destroy()
+
+
+def test_destroy_all(tap, test_env):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment) -> None
+ tap.diagnostic("Test destroy command with the --all option")
+ name_prefixes = ["a", "b", "c", "d"]
+
+ client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
+
+ tap.diagnostic("Create a set of sessions to test destroying all sessions")
+ sessions = None
+ with tap.case(
+ "Create sessions with prefixes [{}]".format(", ".join(name_prefixes))
+ ) as test_case:
+ sessions = SessionSet(client, name_prefixes)
+
+ with tap.case("Destroy all sessions") as test_case:
+ client.destroy_sessions_all()
+
+ tap.test(
+ len(client.list_sessions()) == 0,
+ "No sessions exist after destroying all sessions",
+ )
+
+ with tap.case(
+ "Destroy all sessions when none exist doesn't produce an error"
+ ) as test_case:
+ client.destroy_sessions_all()
+
+
+tap = lttngtest.TapGenerator(48)
+tap.diagnostic("Test client session command --glob and --all options")
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_start_globbing(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_start_single(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_start_all(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_stop_globbing(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_stop_single(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_stop_all(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_destroy_globbing(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_destroy_single(tap, test_env)
+
+with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
+ test_destroy_all(tap, test_env)
+
+sys.exit(0 if tap.is_successful else 1)
#
# SPDX-License-Identifier: GPL-2.0-only
-from concurrent.futures import process
+
from . import lttngctl, logger, environment
import pathlib
import os
import shlex
import subprocess
import enum
+import xml.etree.ElementTree
"""
Implementation of the lttngctl interface based on the `lttng` command line client.
super().__init__(msg)
+class InvalidMI(lttngctl.ControlException):
+ def __init__(self, msg):
+ # type: (str) -> None
+ super().__init__(msg)
+
+
def _get_domain_option_name(domain):
# type: (lttngctl.TracingDomain) -> str
if domain == lttngctl.TracingDomain.User:
domain_option_name = _get_domain_option_name(self.domain)
context_type_name = _get_context_type_name(context_type)
self._client._run_cmd(
- "add-context --{domain_option_name} --type {context_type_name}".format(
+ "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
domain_option_name=domain_option_name,
+ channel_name=self.name,
context_type_name=context_type_name,
)
)
)
domain_name = _get_domain_option_name(self._domain)
self._client._run_cmd(
- "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
+ "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
cmd_name=cmd_name,
session_name=self._session.name,
domain_name=domain_name,
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
- "enable-channel --{domain_name} {channel_name}".format(
- domain_name=domain_option_name, channel_name=channel_name
+ "enable-channel --session '{session_name}' --{domain_name} '{channel_name}'".format(
+ session_name=self.name,
+ domain_name=domain_option_name,
+ channel_name=channel_name,
)
)
return _Channel(self._client, channel_name, domain, self)
def start(self):
# type: () -> None
- self._client._run_cmd("start {session_name}".format(session_name=self.name))
+ self._client._run_cmd("start '{session_name}'".format(session_name=self.name))
def stop(self):
# type: () -> None
- self._client._run_cmd("stop {session_name}".format(session_name=self.name))
+ self._client._run_cmd("stop '{session_name}'".format(session_name=self.name))
def destroy(self):
# type: () -> None
- self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
+ self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name))
+
+ @property
+ def is_active(self):
+ # type: () -> bool
+ list_session_xml = self._client._run_cmd(
+ "list '{session_name}'".format(session_name=self.name),
+ LTTngClient.CommandOutputFormat.MI_XML,
+ )
+
+ root = xml.etree.ElementTree.fromstring(list_session_xml)
+ command_output = LTTngClient._mi_find_in_element(root, "output")
+ sessions = LTTngClient._mi_find_in_element(command_output, "sessions")
+ session_mi = LTTngClient._mi_find_in_element(sessions, "session")
+
+ enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text
+ if enabled_text not in ["true", "false"]:
+ raise InvalidMI(
+ "Expected boolean value in element '{}': value='{}'".format(
+ session_mi.tag, enabled_text
+ )
+ )
+
+ return enabled_text == "true"
@property
def kernel_pid_process_attribute_tracker(self):
Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
"""
+ class CommandOutputFormat(enum.Enum):
+ MI_XML = 0
+ HUMAN = 1
+
+ _MI_NS = "{https://lttng.org/xml/ns/lttng-mi}"
+
def __init__(
self,
test_environment, # type: environment._Environment
logger._Logger.__init__(self, log)
self._environment = test_environment # type: environment._Environment
- def _run_cmd(self, command_args):
- # type: (str) -> None
+ @staticmethod
+ def _namespaced_mi_element(property):
+ # type: (str) -> str
+ return LTTngClient._MI_NS + property
+
+ def _run_cmd(self, command_args, output_format=CommandOutputFormat.MI_XML):
+ # type: (str, CommandOutputFormat) -> str
"""
Invoke the `lttng` client with a set of arguments. The command is
executed in the context of the client's test environment.
"""
args = [str(self._environment.lttng_client_path)] # type: list[str]
+ if output_format == LTTngClient.CommandOutputFormat.MI_XML:
+ args.extend(["--mi", "xml"])
+
args.extend(shlex.split(command_args))
self._log("lttng {command_args}".format(command_args=command_args))
for error_line in decoded_output.splitlines():
self._log(error_line)
raise LTTngClientError(command_args, decoded_output)
+ else:
+ return out.decode("utf-8")
def create_session(self, name=None, output=None):
# type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
raise TypeError("LTTngClient only supports local or no output")
self._run_cmd(
- "create {session_name} {output_option}".format(
+ "create '{session_name}' {output_option}".format(
session_name=name, output_option=output_option
)
)
return _Session(self, name, output)
+
+ def start_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("start '{session_name}'".format(session_name=name))
+
+ def start_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("start --glob '{pattern}'".format(pattern=pattern))
+
+ def start_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("start --all")
+
+ def stop_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("stop '{session_name}'".format(session_name=name))
+
+ def stop_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("stop --glob '{pattern}'".format(pattern=pattern))
+
+ def stop_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("stop --all")
+
+ def destroy_session_by_name(self, name):
+ # type: (str) -> None
+ self._run_cmd("destroy '{session_name}'".format(session_name=name))
+
+ def destroy_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ self._run_cmd("destroy --glob '{pattern}'".format(pattern=pattern))
+
+ def destroy_sessions_all(self):
+ # type: () -> None
+ self._run_cmd("destroy --all")
+
+ @staticmethod
+ def _mi_find_in_element(element, sub_element_name):
+ # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
+ result = element.find(LTTngClient._namespaced_mi_element(sub_element_name))
+ if result is None:
+ raise InvalidMI(
+ "Failed to find element '{}' within command MI element '{}'".format(
+ element.tag, sub_element_name
+ )
+ )
+
+ return result
+
+ def list_sessions(self):
+ # type () -> List[Session]
+ list_sessions_xml = self._run_cmd(
+ "list", LTTngClient.CommandOutputFormat.MI_XML
+ )
+
+ root = xml.etree.ElementTree.fromstring(list_sessions_xml)
+ command_output = self._mi_find_in_element(root, "output")
+ sessions = self._mi_find_in_element(command_output, "sessions")
+
+ ctl_sessions = [] # type: list[lttngctl.Session]
+
+ for session_mi in sessions:
+ name = self._mi_find_in_element(session_mi, "name").text
+ path = self._mi_find_in_element(session_mi, "path").text
+
+ if name is None:
+ raise InvalidMI(
+ "Invalid empty 'name' element in '{}'".format(session_mi.tag)
+ )
+ if path is None:
+ raise InvalidMI(
+ "Invalid empty 'path' element in '{}'".format(session_mi.tag)
+ )
+ if not path.startswith("/"):
+ raise Unsupported(
+ "{} does not support non-local session outputs".format(type(self))
+ )
+
+ ctl_sessions.append(
+ _Session(self, name, lttngctl.LocalSessionOutputLocation(path))
+ )
+
+ return ctl_sessions