Tests: Add wait_until_disconnected() to python test LiveViewer
authorKienan Stewart <kstewart@efficios.com>
Thu, 10 Oct 2024 19:36:17 +0000 (19:36 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 24 Oct 2024 18:06:19 +0000 (14:06 -0400)
Change-Id: I70cf5e46a08c658b3fecbcea9317e1b2d9f769fb
Signed-off-by: Kienan Stewart <kstewart@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
tests/utils/lttngtest/environment.py

index 65d74a41dbfaff2f464b7bfa463d37a54a7bef89..ec64c0a533b89956edcc7a4d524af354502dcae4 100644 (file)
@@ -181,10 +181,10 @@ class _LiveViewer:
                 return True
         return False
 
-    def wait_until_connected(self, timeout=0):
-        connected = False
+    def _wait_until(self, desired_state: bool, timeout=0):
+        connected_state = not desired_state
         started = time.time()
-        while not connected:
+        while connected_state != desired_state:
             try:
                 if timeout != 0 and (time.time() - started) > timeout:
                     raise RuntimeError(
@@ -193,11 +193,17 @@ class _LiveViewer:
                         )
                     )
 
-                connected = self.is_connected()
+                connected_state = self.is_connected()
             except bt2._Error:
                 time.sleep(0.01)
                 continue
-        return connected
+        return connected_state
+
+    def wait_until_disconnected(self, timeout=0):
+        return self._wait_until(False, timeout)
+
+    def wait_until_connected(self, timeout=0):
+        return self._wait_until(True, timeout)
 
     def wait(self):
         if self._live_iterator:
This page took 0.026908 seconds and 4 git commands to generate.