import pwd
import random
import signal
+import socket
import subprocess
import shlex
import shutil
import threading
import contextlib
+import bt2
+
class TemporaryDirectory:
def __init__(self, prefix):
signal.signal(signal_number, original_handler)
+class _LiveViewer:
+ """
+ Create a babeltrace2 live viewer.
+ """
+
+ def __init__(
+ self,
+ environment, # type: Environment
+ session, # type: str
+ hostname=None, # type: Optional[str]
+ ):
+ self._environment = environment
+ self._session = session
+ self._hostname = hostname
+ if self._hostname is None:
+ self._hostname = socket.gethostname()
+ self._events = []
+
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._live_iterator = bt2.TraceCollectionMessageIterator(
+ bt2.ComponentSpec(
+ ctf_live_cc,
+ {
+ "inputs": [
+ "net://localhost:{}/host/{}/{}".format(
+ environment.lttng_relayd_live_port,
+ self._hostname,
+ session,
+ )
+ ],
+ "session-not-found-action": "end",
+ },
+ )
+ )
+
+ try:
+ # Cause the connection to be initiated since tests
+ # tend to wait for a viewer to be connected before proceeding.
+ msg = next(self._live_iterator)
+ self._events.append(msg)
+ except bt2.TryAgain:
+ pass
+
+ @property
+ def output(self):
+ return self._events
+
+ @property
+ def messages(self):
+ return [x for x in self._events if type(x) is bt2._EventMessageConst]
+
+ def _drain(self, retry=False):
+ while True:
+ try:
+ for msg in self._live_iterator:
+ self._events.append(msg)
+ break
+ except bt2.TryAgain as e:
+ if retry:
+ time.sleep(0.01)
+ continue
+ else:
+ break
+
+ def wait_until_connected(self, timeout=0):
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ self._environment._log(
+ "Checking for connected clients at 'net://localhost:{}'".format(
+ self._environment.lttng_relayd_live_port
+ )
+ )
+ query_executor = bt2.QueryExecutor(
+ ctf_live_cc,
+ "sessions",
+ params={
+ "url": "net://localhost:{}".format(
+ self._environment.lttng_relayd_live_port
+ )
+ },
+ )
+ connected = False
+ started = time.time()
+ while not connected:
+ try:
+ if timeout != 0 and (time.time() - started) > timeout:
+ raise RuntimeError(
+ "Timed out waiting for connected clients on session '{}' after {}s".format(
+ self._session, time.time() - started
+ )
+ )
+ query_result = query_executor.query()
+ except bt2._Error:
+ time.sleep(0.01)
+ continue
+ for live_session in query_result:
+ if (
+ live_session["session-name"] == self._session
+ and live_session["client-count"] >= 1
+ ):
+ connected = True
+ self._environment._log(
+ "Session '{}' has {} connected clients".format(
+ live_session["session-name"], live_session["client-count"]
+ )
+ )
+ break
+ return connected
+
+ def wait(self):
+ if self._live_iterator:
+ self._drain(retry=True)
+ del self._live_iterator
+ self._live_iterator = None
+
+ def __del__(self):
+ pass
+
+
class _WaitTraceTestApplication:
"""
Create an application that waits before tracing. This allows a test to
test_app_args,
env=test_app_env,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
+ stderr=subprocess.PIPE,
) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+class SavingProcessOutputConsumer(ProcessOutputConsumer):
+ def __init__(self, process, name, log):
+ self._lines = []
+ super().__init__(process=process, name=name, log=log)
+
+ def run(self):
+ # type: () -> None
+ while self._process.poll() is None:
+ assert self._process.stdout
+ line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
+ if len(line) != 0:
+ self._lines.append(line)
+ self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
+
+ @property
+ def output(self):
+ return self._lines
+
+
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
)
self._cleanup()
+ def launch_live_viewer(self, session, hostname=None):
+ # Make sure the relayd is ready
+ ready = False
+ ctf_live_cc = bt2.find_plugin("ctf").source_component_classes["lttng-live"]
+ query_executor = bt2.QueryExecutor(
+ ctf_live_cc,
+ "sessions",
+ params={"url": "net://localhost:{}".format(self.lttng_relayd_live_port)},
+ )
+ while not ready:
+ try:
+ query_result = query_executor.query()
+ except bt2._Error:
+ time.sleep(0.1)
+ continue
+ for live_session in query_result:
+ if live_session["session-name"] == session:
+ ready = True
+ self._log(
+ "Session '{}' is available at net://localhost:{}".format(
+ session, self.lttng_relayd_live_port
+ )
+ )
+ break
+ return _LiveViewer(self, session, hostname)
+
def launch_wait_trace_test_application(
self,
event_count, # type: int