Tests: Add LiveViewer to lttngtest environment
authorKienan Stewart <kstewart@efficios.com>
Fri, 29 Mar 2024 20:58:03 +0000 (16:58 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 30 Aug 2024 21:06:33 +0000 (17:06 -0400)
Drawbacks
=========

With the current python bindings, the relayd seems to leak a file
descriptor; however, this doesn't stop the tests from working.

E.g.

```
ok 1 - BT2 live viewer exited successfully

Change-Id: I13994f7c8b0b6cffcee0d0ea0f8fca22538de651
---
  duration_ms: 1097.302968
...

 Killing session daemon (pid = 3340512)
Session daemon killed
lttng-relayd: Error: A file descriptor leak has been detected: 1
tracked file descriptors are still being tracked
```

Change-Id: Ie4294dd7238d4b6074af2d4cf193e1ca9949a741
Signed-off-by: Kienan Stewart <kstewart@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
tests/utils/lttngtest/environment.py

index 29bf4e44a5a9866fdd8fc23bac42be208977e1c2..6cb2ebaf019a2b051b4cae17cdcf72d9fa5d76dc 100644 (file)
@@ -12,6 +12,7 @@ import pathlib
 import pwd
 import random
 import signal
+import socket
 import subprocess
 import shlex
 import shutil
@@ -25,6 +26,8 @@ import time
 import threading
 import contextlib
 
+import bt2
+
 
 class TemporaryDirectory:
     def __init__(self, prefix):
@@ -82,6 +85,124 @@ class _SignalWaitQueue:
             signal.signal(signal_number, original_handler)
 
 
+class _LiveViewer:
+    """
+    Create a babeltrace2 live viewer.
+    """
+
+    def __init__(
+        self,
+        environment,  # type: Environment
+        session,  # type: str
+        hostname=None,  # type: Optional[str]
+    ):
+        self._environment = environment
+        self._session = session
+        self._hostname = hostname
+        if self._hostname is None:
+            self._hostname = socket.gethostname()
+        self._events = []
+
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        self._live_iterator = bt2.TraceCollectionMessageIterator(
+            bt2.ComponentSpec(
+                ctf_live_cc,
+                {
+                    "inputs": [
+                        "net://localhost:{}/host/{}/{}".format(
+                            environment.lttng_relayd_live_port,
+                            self._hostname,
+                            session,
+                        )
+                    ],
+                    "session-not-found-action": "end",
+                },
+            )
+        )
+
+        try:
+            # Cause the connection to be initiated since tests
+            # tend to wait for a viewer to be connected before proceeding.
+            msg = next(self._live_iterator)
+            self._events.append(msg)
+        except bt2.TryAgain:
+            pass
+
+    @property
+    def output(self):
+        return self._events
+
+    @property
+    def messages(self):
+        return [x for x in self._events if type(x) is bt2._EventMessageConst]
+
+    def _drain(self, retry=False):
+        while True:
+            try:
+                for msg in self._live_iterator:
+                    self._events.append(msg)
+                break
+            except bt2.TryAgain as e:
+                if retry:
+                    time.sleep(0.01)
+                    continue
+                else:
+                    break
+
+    def wait_until_connected(self, timeout=0):
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        self._environment._log(
+            "Checking for connected clients at 'net://localhost:{}'".format(
+                self._environment.lttng_relayd_live_port
+            )
+        )
+        query_executor = bt2.QueryExecutor(
+            ctf_live_cc,
+            "sessions",
+            params={
+                "url": "net://localhost:{}".format(
+                    self._environment.lttng_relayd_live_port
+                )
+            },
+        )
+        connected = False
+        started = time.time()
+        while not connected:
+            try:
+                if timeout != 0 and (time.time() - started) > timeout:
+                    raise RuntimeError(
+                        "Timed out waiting for connected clients on session '{}' after {}s".format(
+                            self._session, time.time() - started
+                        )
+                    )
+                query_result = query_executor.query()
+            except bt2._Error:
+                time.sleep(0.01)
+                continue
+            for live_session in query_result:
+                if (
+                    live_session["session-name"] == self._session
+                    and live_session["client-count"] >= 1
+                ):
+                    connected = True
+                    self._environment._log(
+                        "Session '{}' has {} connected clients".format(
+                            live_session["session-name"], live_session["client-count"]
+                        )
+                    )
+                    break
+        return connected
+
+    def wait(self):
+        if self._live_iterator:
+            self._drain(retry=True)
+            del self._live_iterator
+            self._live_iterator = None
+
+    def __del__(self):
+        pass
+
+
 class _WaitTraceTestApplication:
     """
     Create an application that waits before tracing. This allows a test to
@@ -233,7 +354,7 @@ class _WaitTraceTestApplication:
             test_app_args,
             env=test_app_env,
             stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
+            stderr=subprocess.PIPE,
         )  # type: subprocess.Popen
 
         # Wait for the application to create the file indicating it has fully
@@ -459,6 +580,25 @@ class ProcessOutputConsumer(threading.Thread, logger._Logger):
                 self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
 
 
+class SavingProcessOutputConsumer(ProcessOutputConsumer):
+    def __init__(self, process, name, log):
+        self._lines = []
+        super().__init__(process=process, name=name, log=log)
+
+    def run(self):
+        # type: () -> None
+        while self._process.poll() is None:
+            assert self._process.stdout
+            line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
+            if len(line) != 0:
+                self._lines.append(line)
+                self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+
+    @property
+    def output(self):
+        return self._lines
+
+
 # Generate a temporary environment in which to execute a test.
 class _Environment(logger._Logger):
     def __init__(
@@ -731,6 +871,32 @@ class _Environment(logger._Logger):
         )
         self._cleanup()
 
+    def launch_live_viewer(self, session, hostname=None):
+        # Make sure the relayd is ready
+        ready = False
+        ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+        query_executor = bt2.QueryExecutor(
+            ctf_live_cc,
+            "sessions",
+            params={"url": "net://localhost:{}".format(self.lttng_relayd_live_port)},
+        )
+        while not ready:
+            try:
+                query_result = query_executor.query()
+            except bt2._Error:
+                time.sleep(0.1)
+                continue
+            for live_session in query_result:
+                if live_session["session-name"] == session:
+                    ready = True
+                    self._log(
+                        "Session '{}' is available at net://localhost:{}".format(
+                            session, self.lttng_relayd_live_port
+                        )
+                    )
+                    break
+        return _LiveViewer(self, session, hostname)
+
     def launch_wait_trace_test_application(
         self,
         event_count,  # type: int
This page took 0.028576 seconds and 4 git commands to generate.