From 91118dcccbed2133001c81f0a9715c9e85708585 Mon Sep 17 00:00:00 2001 From: Kienan Stewart Date: Fri, 29 Mar 2024 16:58:03 -0400 Subject: [PATCH] Tests: Add LiveViewer to lttngtest environment MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Drawbacks ========= With the current python bindings, the relayd seems to leak a file descriptor; however, this doesn't stop the tests from working. E.g. ``` ok 1 - BT2 live viewer exited successfully Change-Id: I13994f7c8b0b6cffcee0d0ea0f8fca22538de651 --- duration_ms: 1097.302968 ... Killing session daemon (pid = 3340512) Session daemon killed lttng-relayd: Error: A file descriptor leak has been detected: 1 tracked file descriptors are still being tracked ``` Change-Id: Ie4294dd7238d4b6074af2d4cf193e1ca9949a741 Signed-off-by: Kienan Stewart Signed-off-by: Jérémie Galarneau --- tests/utils/lttngtest/environment.py | 168 ++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 1 deletion(-) diff --git a/tests/utils/lttngtest/environment.py b/tests/utils/lttngtest/environment.py index 29bf4e44a..6cb2ebaf0 100644 --- a/tests/utils/lttngtest/environment.py +++ b/tests/utils/lttngtest/environment.py @@ -12,6 +12,7 @@ import pathlib import pwd import random import signal +import socket import subprocess import shlex import shutil @@ -25,6 +26,8 @@ import time import threading import contextlib +import bt2 + class TemporaryDirectory: def __init__(self, prefix): @@ -82,6 +85,124 @@ class _SignalWaitQueue: signal.signal(signal_number, original_handler) +class _LiveViewer: + """ + Create a babeltrace2 live viewer. + """ + + def __init__( + self, + environment, # type: Environment + session, # type: str + hostname=None, # type: Optional[str] + ): + self._environment = environment + self._session = session + self._hostname = hostname + if self._hostname is None: + self._hostname = socket.gethostname() + self._events = [] + + ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"] + self._live_iterator = bt2.TraceCollectionMessageIterator( + bt2.ComponentSpec( + ctf_live_cc, + { + "inputs": [ + "net://localhost:{}/host/{}/{}".format( + environment.lttng_relayd_live_port, + self._hostname, + session, + ) + ], + "session-not-found-action": "end", + }, + ) + ) + + try: + # Cause the connection to be initiated since tests + # tend to wait for a viewer to be connected before proceeding. + msg = next(self._live_iterator) + self._events.append(msg) + except bt2.TryAgain: + pass + + @property + def output(self): + return self._events + + @property + def messages(self): + return [x for x in self._events if type(x) is bt2._EventMessageConst] + + def _drain(self, retry=False): + while True: + try: + for msg in self._live_iterator: + self._events.append(msg) + break + except bt2.TryAgain as e: + if retry: + time.sleep(0.01) + continue + else: + break + + def wait_until_connected(self, timeout=0): + ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"] + self._environment._log( + "Checking for connected clients at 'net://localhost:{}'".format( + self._environment.lttng_relayd_live_port + ) + ) + query_executor = bt2.QueryExecutor( + ctf_live_cc, + "sessions", + params={ + "url": "net://localhost:{}".format( + self._environment.lttng_relayd_live_port + ) + }, + ) + connected = False + started = time.time() + while not connected: + try: + if timeout != 0 and (time.time() - started) > timeout: + raise RuntimeError( + "Timed out waiting for connected clients on session '{}' after {}s".format( + self._session, time.time() - started + ) + ) + query_result = query_executor.query() + except bt2._Error: + time.sleep(0.01) + continue + for live_session in query_result: + if ( + live_session["session-name"] == self._session + and live_session["client-count"] >= 1 + ): + connected = True + self._environment._log( + "Session '{}' has {} connected clients".format( + live_session["session-name"], live_session["client-count"] + ) + ) + break + return connected + + def wait(self): + if self._live_iterator: + self._drain(retry=True) + del self._live_iterator + self._live_iterator = None + + def __del__(self): + pass + + class _WaitTraceTestApplication: """ Create an application that waits before tracing. This allows a test to @@ -233,7 +354,7 @@ class _WaitTraceTestApplication: test_app_args, env=test_app_env, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, + stderr=subprocess.PIPE, ) # type: subprocess.Popen # Wait for the application to create the file indicating it has fully @@ -459,6 +580,25 @@ class ProcessOutputConsumer(threading.Thread, logger._Logger): self._log("{prefix}: {line}".format(prefix=self._prefix, line=line)) +class SavingProcessOutputConsumer(ProcessOutputConsumer): + def __init__(self, process, name, log): + self._lines = [] + super().__init__(process=process, name=name, log=log) + + def run(self): + # type: () -> None + while self._process.poll() is None: + assert self._process.stdout + line = self._process.stdout.readline().decode("utf-8").replace("\n", "") + if len(line) != 0: + self._lines.append(line) + self._log("{prefix}: {line}".format(prefix=self._prefix, line=line)) + + @property + def output(self): + return self._lines + + # Generate a temporary environment in which to execute a test. class _Environment(logger._Logger): def __init__( @@ -731,6 +871,32 @@ class _Environment(logger._Logger): ) self._cleanup() + def launch_live_viewer(self, session, hostname=None): + # Make sure the relayd is ready + ready = False + ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"] + query_executor = bt2.QueryExecutor( + ctf_live_cc, + "sessions", + params={"url": "net://localhost:{}".format(self.lttng_relayd_live_port)}, + ) + while not ready: + try: + query_result = query_executor.query() + except bt2._Error: + time.sleep(0.1) + continue + for live_session in query_result: + if live_session["session-name"] == session: + ready = True + self._log( + "Session '{}' is available at net://localhost:{}".format( + session, self.lttng_relayd_live_port + ) + ) + break + return _LiveViewer(self, session, hostname) + def launch_wait_trace_test_application( self, event_count, # type: int -- 2.34.1