import bt2
-def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str:
+def context_trace_field_name(context_type):
+ # type: (Type[lttngtest.ContextType]) -> str
if isinstance(context_type, lttngtest.VpidContextType):
return "vpid"
elif isinstance(context_type, lttngtest.VuidContextType):
def trace_stream_class_has_context_field_in_event_context(
- trace_location: pathlib.Path, context_field_name: str
-) -> bool:
+ trace_location, context_field_name
+):
+ # type: (pathlib.Path, str) -> bool
iterator = bt2.TraceCollectionMessageIterator(str(trace_location))
# A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage.
return context_field_name in event_common_context_field_class
-def trace_events_have_context_value(
- trace_location: pathlib.Path, context_field_name: str, value: Any
-) -> bool:
+def trace_events_have_context_value(trace_location, context_field_name, value):
+ # type: (pathlib.Path, str, Any) -> bool
for msg in bt2.TraceCollectionMessageIterator(str(trace_location)):
if type(msg) is not bt2._EventMessageConst:
continue
return True
-def test_static_context(
- tap: lttngtest.TapGenerator,
- test_env: lttngtest._Environment,
- context_type: lttngtest.ContextType,
- context_value_retriever: Callable[[lttngtest.WaitTraceTestApplication], Any],
-) -> None:
+def test_static_context(tap, test_env, context_type, context_value_retriever):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment, lttngtest.ContextType, Callable[[lttngtest.WaitTraceTestApplication], Any]) -> None
tap.diagnostic(
"Test presence and expected value of context `{context_name}`".format(
context_name=type(context_type).__name__
#
from types import FrameType
-from typing import Callable, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List
import sys
import pathlib
import signal
class TemporaryDirectory:
- def __init__(self, prefix: str):
+ def __init__(self, prefix):
+ # type: (str) -> None
self._directory_path = tempfile.mkdtemp(prefix=prefix)
def __del__(self):
shutil.rmtree(self._directory_path, ignore_errors=True)
@property
- def path(self) -> pathlib.Path:
+ def path(self):
+ # type: () -> pathlib.Path
return pathlib.Path(self._directory_path)
"""
def __init__(self):
- self._queue: queue.Queue = queue.Queue()
+ self._queue = queue.Queue() # type: queue.Queue
- def signal(self, signal_number, frame: Optional[FrameType]):
+ def signal(
+ self,
+ signal_number,
+ frame, # type: Optional[FrameType]
+ ):
self._queue.put_nowait(signal_number)
def wait_for_signal(self):
def __init__(
self,
- binary_path: pathlib.Path,
- event_count: int,
- environment: "Environment",
- wait_time_between_events_us: int = 0,
+ binary_path, # type: pathlib.Path
+ event_count, # type: int
+ environment, # type: Environment
+ wait_time_between_events_us=0, # type: int
):
- self._environment: Environment = environment
+ self._environment = environment # type: Environment
if event_count % 5:
# The test application currently produces 5 different events per iteration.
raise ValueError("event count must be a multiple of 5")
- self._iteration_count: int = int(event_count / 5)
+ self._iteration_count = int(event_count / 5) # type: int
# File that the application will wait to see before tracing its events.
- self._app_start_tracing_file_path: pathlib.Path = pathlib.Path(
+ self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
prefix="app_",
suffix="_ready",
dir=self._compat_open_path(environment.lttng_home_location),
- )
+ ) # type: str
test_app_args = [str(binary_path)]
test_app_args.extend(
)
)
- self._process: subprocess.Popen = subprocess.Popen(
+ self._process = subprocess.Popen(
test_app_args,
env=test_app_env,
- )
+ ) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
# initialized. Make sure the app hasn't crashed in order to not wait
time.sleep(0.1)
- def trace(self) -> None:
+ def trace(self):
+ # type: () -> None
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
)
open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
- def wait_for_exit(self) -> None:
+ def wait_for_exit(self):
+ # type: () -> None
if self._process.wait() != 0:
raise RuntimeError(
"Test application has exit with return code `{return_code}`".format(
self._has_returned = True
@property
- def vpid(self) -> int:
+ def vpid(self):
+ # type: () -> int
return self._process.pid
@staticmethod
def _compat_open_path(path):
- # type: (pathlib.Path)
+ # type: (pathlib.Path) -> pathlib.Path | str
"""
The builtin open() in python >= 3.6 expects a path-like object while
prior versions expect a string or bytes object. Return the correct type
class ProcessOutputConsumer(threading.Thread, logger._Logger):
def __init__(
- self, process: subprocess.Popen, name: str, log: Callable[[str], None]
+ self,
+ process, # type: subprocess.Popen
+ name, # type: str
+ log, # type: Callable[[str], None]
):
threading.Thread.__init__(self)
self._prefix = name
logger._Logger.__init__(self, log)
self._process = process
- def run(self) -> None:
+ def run(self):
+ # type: () -> None
while self._process.poll() is None:
assert self._process.stdout
line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
- self, with_sessiond: bool, log: Optional[Callable[[str], None]] = None
+ self,
+ with_sessiond, # type: bool
+ log=None, # type: Optional[Callable[[str], None]]
):
super().__init__(log)
signal.signal(signal.SIGTERM, self._handle_termination_signal)
# Assumes the project's hierarchy to this file is:
# tests/utils/python/this_file
- self._project_root: pathlib.Path = pathlib.Path(__file__).absolute().parents[3]
- self._lttng_home: Optional[TemporaryDirectory] = TemporaryDirectory(
+ self._project_root = (
+ pathlib.Path(__file__).absolute().parents[3]
+ ) # type: pathlib.Path
+ self._lttng_home = TemporaryDirectory(
"lttng_test_env_home"
- )
+ ) # type: Optional[TemporaryDirectory]
- self._sessiond: Optional[subprocess.Popen[bytes]] = (
+ self._sessiond = (
self._launch_lttng_sessiond() if with_sessiond else None
- )
+ ) # type: Optional[subprocess.Popen[bytes]]
@property
- def lttng_home_location(self) -> pathlib.Path:
+ def lttng_home_location(self):
+ # type: () -> pathlib.Path
if self._lttng_home is None:
raise RuntimeError("Attempt to access LTTng home after clean-up")
return self._lttng_home.path
@property
- def lttng_client_path(self) -> pathlib.Path:
+ def lttng_client_path(self):
+ # type: () -> pathlib.Path
return self._project_root / "src" / "bin" / "lttng" / "lttng"
- def create_temporary_directory(self, prefix: Optional[str] = None) -> pathlib.Path:
+ def create_temporary_directory(self, prefix=None):
+ # type: (Optional[str]) -> pathlib.Path
# Simply return a path that is contained within LTTNG_HOME; it will
# be destroyed when the temporary home goes out of scope.
assert self._lttng_home
# Unpack a list of environment variables from a string
# such as "HELLO=is_it ME='/you/are/looking/for'"
@staticmethod
- def _unpack_env_vars(env_vars_string: str) -> List[Tuple[str, str]]:
+ def _unpack_env_vars(env_vars_string):
+ # type: (str) -> List[Tuple[str, str]]
unpacked_vars = []
for var in shlex.split(env_vars_string):
equal_position = var.find("=")
return unpacked_vars
- def _launch_lttng_sessiond(self) -> Optional[subprocess.Popen]:
+ def _launch_lttng_sessiond(self):
+ # type: () -> Optional[subprocess.Popen]
is_64bits_host = sys.maxsize > 2**32
sessiond_path = (
)
if self._logging_function:
- self._sessiond_output_consumer: Optional[
- ProcessOutputConsumer
- ] = ProcessOutputConsumer(process, "lttng-sessiond", self._logging_function)
+ self._sessiond_output_consumer = ProcessOutputConsumer(
+ process, "lttng-sessiond", self._logging_function
+ ) # type: Optional[ProcessOutputConsumer]
self._sessiond_output_consumer.daemon = True
self._sessiond_output_consumer.start()
return process
- def _handle_termination_signal(
- self, signal_number: int, frame: Optional[FrameType]
- ) -> None:
+ def _handle_termination_signal(self, signal_number, frame):
+ # type: (int, Optional[FrameType]) -> None
self._log(
"Killed by {signal_name} signal, cleaning-up".format(
signal_name=signal.strsignal(signal_number)
)
self._cleanup()
- def launch_wait_trace_test_application(
- self, event_count: int
- ) -> WaitTraceTestApplication:
+ def launch_wait_trace_test_application(self, event_count):
+ # type: (int) -> WaitTraceTestApplication
"""
Launch an application that will wait before tracing `event_count` events.
"""
)
# Clean-up managed processes
- def _cleanup(self) -> None:
+ def _cleanup(self):
+ # type: () -> None
if self._sessiond and self._sessiond.poll() is None:
# The session daemon is alive; kill it.
self._log(
@contextlib.contextmanager
-def test_environment(with_sessiond: bool, log: Optional[Callable[[str], None]] = None):
+def test_environment(with_sessiond, log=None):
+ # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
env = _Environment(with_sessiond, log)
try:
yield env
class _Logger:
- def __init__(self, log: Optional[Callable[[str], None]]):
- self._logging_function: Optional[Callable[[str], None]] = log
+ def __init__(self, log):
+ # type: (Optional[Callable[[str], None]]) -> None
+ self._logging_function = log # type: Optional[Callable[[str], None]]
- def _log(self, msg: str) -> None:
+ def _log(self, msg):
+ # type: (str) -> None
if self._logging_function:
self._logging_function(msg)
@property
- def logger(self) -> Optional[Callable[[str], None]]:
+ def logger(self):
+ # type: () -> Optional[Callable[[str], None]]
return self._logging_function
class Unsupported(lttngctl.ControlException):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
-def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
+def _get_domain_option_name(domain):
+ # type: (lttngctl.TracingDomain) -> str
if domain == lttngctl.TracingDomain.User:
return "userspace"
elif domain == lttngctl.TracingDomain.Kernel:
raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
-def _get_context_type_name(context: lttngctl.ContextType) -> str:
+def _get_context_type_name(context):
+ # type: (lttngctl.ContextType) -> str
if isinstance(context, lttngctl.VgidContextType):
return "vgid"
elif isinstance(context, lttngctl.VuidContextType):
class _Channel(lttngctl.Channel):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ name, # type: str
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
):
- self._client: LTTngClient = client
- self._name: str = name
- self._domain: lttngctl.TracingDomain = domain
- self._session: _Session = session
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
domain_option_name = _get_domain_option_name(self.domain)
context_type_name = _get_context_type_name(context_type)
self._client._run_cmd(
)
)
- def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
+ def add_recording_rule(self, rule):
+ # type: (Type[lttngctl.EventRule]) -> None
client_args = (
"enable-event --session {session_name} --channel {channel_name}".format(
session_name=self._session.name, channel_name=self.name
self._client._run_cmd(client_args)
@property
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
return self._name
@property
- def domain(self) -> lttngctl.TracingDomain:
+ def domain(self):
+ # type: () -> lttngctl.TracingDomain
return self._domain
return "<%s.%s>" % (self.__class__.__name__, self.name)
-def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
+def _get_process_attribute_option_name(attribute):
+ # type: (_ProcessAttribute) -> str
return {
_ProcessAttribute.PID: "pid",
_ProcessAttribute.VPID: "vpid",
class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
def __init__(
self,
- client: "LTTngClient",
- attribute: _ProcessAttribute,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ attribute, # type: _ProcessAttribute
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
):
- self._client: LTTngClient = client
- self._tracked_attribute: _ProcessAttribute = attribute
- self._domain: lttngctl.TracingDomain = domain
- self._session: "_Session" = session
+ self._client = client # type: LTTngClient
+ self._tracked_attribute = attribute # type: _ProcessAttribute
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
- self._allowed_value_types: list[type] = [int, str]
+ self._allowed_value_types = [int, str] # type: list[type]
else:
- self._allowed_value_types: list[type] = [int]
+ self._allowed_value_types = [int] # type: list[type]
- def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
+ def _call_client(self, cmd_name, value):
+ # type: (str, Union[int, str]) -> None
if type(value) not in self._allowed_value_types:
raise TypeError(
"Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
)
)
- def track(self, value: Union[int, str]) -> None:
+ def track(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("track", value)
- def untrack(self, value: Union[int, str]) -> None:
+ def untrack(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("untrack", value)
class _Session(lttngctl.Session):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- output: Optional[Type[lttngctl.SessionOutputLocation]],
+ client, # type: LTTngClient
+ name, # type: str
+ output, # type: Optional[lttngctl.SessionOutputLocation]
):
- self._client: LTTngClient = client
- self._name: str = name
- self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._output = output # type: Optional[lttngctl.SessionOutputLocation]
@property
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
return self._name
- def add_channel(
- self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
- ) -> lttngctl.Channel:
+ def add_channel(self, domain, channel_name=None):
+ # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
)
return _Channel(self._client, channel_name, domain, self)
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
pass
@property
- def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
- return self._output
+ def output(self):
+ # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
+ return self._output # type: ignore
- def start(self) -> None:
+ def start(self):
+ # type: () -> None
self._client._run_cmd("start {session_name}".format(session_name=self.name))
- def stop(self) -> None:
+ def stop(self):
+ # type: () -> None
self._client._run_cmd("stop {session_name}".format(session_name=self.name))
- def destroy(self) -> None:
+ def destroy(self):
+ # type: () -> None
self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
@property
- def kernel_pid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
+ def kernel_pid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def kernel_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def user_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_gid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
+ def kernel_gid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def kernel_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def user_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_uid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
+ def kernel_uid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def kernel_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def user_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
class LTTngClientError(lttngctl.ControlException):
- def __init__(self, command_args: str, error_output: str):
- self._command_args: str = command_args
- self._output: str = error_output
+ def __init__(
+ self,
+ command_args, # type: str
+ error_output, # type: str
+ ):
+ self._command_args = command_args # type: str
+ self._output = error_output # type: str
class LTTngClient(logger._Logger, lttngctl.Controller):
def __init__(
self,
- test_environment: environment._Environment,
- log: Optional[Callable[[str], None]],
+ test_environment, # type: environment._Environment
+ log, # type: Optional[Callable[[str], None]]
):
logger._Logger.__init__(self, log)
- self._environment: environment._Environment = test_environment
+ self._environment = test_environment # type: environment._Environment
- def _run_cmd(self, command_args: str) -> None:
+ def _run_cmd(self, command_args):
+ # type: (str) -> None
"""
Invoke the `lttng` client with a set of arguments. The command is
executed in the context of the client's test environment.
"""
- args: list[str] = [str(self._environment.lttng_client_path)]
+ args = [str(self._environment.lttng_client_path)] # type: list[str]
args.extend(shlex.split(command_args))
self._log("lttng {command_args}".format(command_args=command_args))
- client_env: dict[str, str] = os.environ.copy()
+ client_env = os.environ.copy() # type: dict[str, str]
client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
process = subprocess.Popen(
self._log(error_line)
raise LTTngClientError(command_args, decoded_output)
- def create_session(
- self,
- name: Optional[str] = None,
- output: Optional[lttngctl.SessionOutputLocation] = None,
- ) -> lttngctl.Session:
+ def create_session(self, name=None, output=None):
+ # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
name = name if name else lttngctl.Session._generate_name()
if isinstance(output, lttngctl.LocalSessionOutputLocation):
"""
-def _generate_random_string(length: int) -> str:
+def _generate_random_string(length):
+ # type: (int) -> str
return "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(length)
)
class JavaApplicationContextType(ContextType):
"""A java application-specific context field is a piece of state which the application provides."""
- def __init__(self, retriever_name: str, field_name: str):
- self._retriever_name: str = retriever_name
- self._field_name: str = field_name
+ def __init__(
+ self,
+ retriever_name, # type: str
+ field_name, # type: str
+ ):
+ self._retriever_name = retriever_name # type: str
+ self._field_name = field_name # type: str
@property
- def retriever_name(self) -> str:
+ def retriever_name(self):
+ # type: () -> str
return self._retriever_name
@property
- def field_name(self) -> str:
+ def field_name(self):
+ # type: () -> str
return self._field_name
class LogLevelRuleAsSevereAs(LogLevelRule):
- def __init__(self, level: int):
+ def __init__(self, level):
+ # type: (int)
self._level = level
@property
- def level(self) -> int:
+ def level(self):
+ # type: () -> int
return self._level
class LogLevelRuleExactly(LogLevelRule):
- def __init__(self, level: int):
+ def __init__(self, level):
+ # type: (int)
self._level = level
@property
- def level(self) -> int:
+ def level(self):
+ # type: () -> int
return self._level
class TracepointEventRule(EventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
):
- self._name_pattern: Optional[str] = name_pattern
- self._filter_expression: Optional[str] = filter_expression
- self._log_level_rule: Optional[LogLevelRule] = log_level_rule
- self._name_pattern_exclusions: Optional[List[str]] = name_pattern_exclusions
+ self._name_pattern = name_pattern # type: Optional[str]
+ self._filter_expression = filter_expression # type: Optional[str]
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
@property
- def name_pattern(self) -> Optional[str]:
+ def name_pattern(self):
+ # type: () -> Optional[str]
return self._name_pattern
@property
- def filter_expression(self) -> Optional[str]:
+ def filter_expression(self):
+ # type: () -> Optional[str]
return self._filter_expression
@property
- def log_level_rule(self) -> Optional[LogLevelRule]:
+ def log_level_rule(self):
+ # type: () -> Optional[LogLevelRule]
return self._log_level_rule
@property
- def name_pattern_exclusions(self) -> Optional[List[str]]:
+ def name_pattern_exclusions(self):
+ # type: () -> Optional[List[str]]
return self._name_pattern_exclusions
class UserTracepointEventRule(TracepointEventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
):
TracepointEventRule.__init__(**locals())
class KernelTracepointEventRule(TracepointEventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
):
TracepointEventRule.__init__(**locals())
"""
@staticmethod
- def _generate_name() -> str:
+ def _generate_name():
+ # type: () -> str
return "channel_{random_id}".format(random_id=_generate_random_string(8))
@abc.abstractmethod
- def add_context(self, context_type: ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (ContextType) -> None
pass
@property
@abc.abstractmethod
- def domain(self) -> TracingDomain:
+ def domain(self):
+ # type: () -> TracingDomain
pass
@property
@abc.abstractmethod
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
pass
@abc.abstractmethod
- def add_recording_rule(self, rule: Type[EventRule]) -> None:
+ def add_recording_rule(self, rule) -> None:
+ # type: (Type[EventRule]) -> None
pass
class LocalSessionOutputLocation(SessionOutputLocation):
- def __init__(self, trace_path: pathlib.Path):
+ def __init__(self, trace_path):
+ # type: (pathlib.Path)
self._path = trace_path
@property
- def path(self) -> pathlib.Path:
+ def path(self):
+ # type: () -> pathlib.Path
return self._path
def __repr__(self):
return "<%s.%s>" % (self.__class__.__name__, self.name)
- def __init__(self, policy: TrackingPolicy):
+ def __init__(self, policy):
+ # type: (TrackingPolicy)
self._policy = policy
@property
- def tracking_policy(self) -> TrackingPolicy:
+ def tracking_policy(self):
+ # type: () -> TrackingPolicy
return self._policy
class ProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, pid: int) -> None:
+ def track(self, pid):
+ # type: (int) -> None
pass
@abc.abstractmethod
- def untrack(self, pid: int) -> None:
+ def untrack(self, pid):
+ # type: (int) -> None
pass
class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vpid: int) -> None:
+ def track(self, vpid):
+ # type: (int) -> None
pass
@abc.abstractmethod
- def untrack(self, vpid: int) -> None:
+ def untrack(self, vpid):
+ # type: (int) -> None
pass
class UserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, uid: Union[int, str]) -> None:
+ def track(self, uid):
+ # type: (Union[int, str]) -> None
pass
@abc.abstractmethod
- def untrack(self, uid: Union[int, str]) -> None:
+ def untrack(self, uid):
+ # type: (Union[int, str]) -> None
pass
class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vuid: Union[int, str]) -> None:
+ def track(self, vuid):
+ # type: (Union[int, str]) -> None
pass
@abc.abstractmethod
- def untrack(self, vuid: Union[int, str]) -> None:
+ def untrack(self, vuid):
+ # type: (Union[int, str]) -> None
pass
class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, gid: Union[int, str]) -> None:
+ def track(self, gid):
+ # type: (Union[int, str]) -> None
pass
@abc.abstractmethod
- def untrack(self, gid: Union[int, str]) -> None:
+ def untrack(self, gid):
+ # type: (Union[int, str]) -> None
pass
class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vgid: Union[int, str]) -> None:
+ def track(self, vgid):
+ # type: (Union[int, str]) -> None
pass
@abc.abstractmethod
- def untrack(self, vgid: Union[int, str]) -> None:
+ def untrack(self, vgid):
+ # type: (Union[int, str]) -> None
pass
class Session(abc.ABC):
@staticmethod
- def _generate_name() -> str:
+ def _generate_name():
+ # type: () -> str
return "session_{random_id}".format(random_id=_generate_random_string(8))
@property
@abc.abstractmethod
- def name(self) -> str:
+ def name(self):
+ # type: () -> str
pass
@property
@abc.abstractmethod
- def output(self) -> Optional[Type[SessionOutputLocation]]:
+ def output(self):
+ # type: () -> Optional[Type[SessionOutputLocation]]
pass
@abc.abstractmethod
- def add_channel(
- self, domain: TracingDomain, channel_name: Optional[str] = None
- ) -> Channel:
+ def add_channel(self, domain, channel_name=None):
+ # type: (TracingDomain, Optional[str]) -> Channel
"""Add a channel with default attributes to the session."""
pass
@abc.abstractmethod
- def start(self) -> None:
+ def start(self):
+ # type: () -> None
pass
@abc.abstractmethod
- def stop(self) -> None:
+ def stop(self):
+ # type: () -> None
pass
@abc.abstractmethod
- def destroy(self) -> None:
+ def destroy(self):
+ # type: () -> None
pass
@abc.abstractproperty
- def kernel_pid_process_attribute_tracker(
- self,
- ) -> Type[ProcessIDProcessAttributeTracker]:
+ def kernel_pid_process_attribute_tracker(self):
+ # type: () -> Type[ProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def kernel_vpid_process_attribute_tracker(
- self,
- ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+ def kernel_vpid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
def user_vpid_process_attribute_tracker(
self,
) -> Type[VirtualProcessIDProcessAttributeTracker]:
+ # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def kernel_gid_process_attribute_tracker(
- self,
- ) -> Type[GroupIDProcessAttributeTracker]:
+ def kernel_gid_process_attribute_tracker(self):
+ # type: () -> Type[GroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def kernel_vgid_process_attribute_tracker(
- self,
- ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+ def kernel_vgid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def user_vgid_process_attribute_tracker(
- self,
- ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+ def user_vgid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def kernel_uid_process_attribute_tracker(
- self,
- ) -> Type[UserIDProcessAttributeTracker]:
+ def kernel_uid_process_attribute_tracker(self):
+ # type: () -> Type[UserIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def kernel_vuid_process_attribute_tracker(
- self,
- ) -> Type[VirtualUserIDProcessAttributeTracker]:
+ def kernel_vuid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualUserIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
- def user_vuid_process_attribute_tracker(
- self,
- ) -> Type[VirtualUserIDProcessAttributeTracker]:
+ def user_vuid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualUserIDProcessAttributeTracker]
raise NotImplementedError
class ControlException(RuntimeError):
"""Base type for exceptions thrown by a controller."""
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str)
super().__init__(msg)
"""
@abc.abstractmethod
- def create_session(
- self, name: Optional[str] = None, output: Optional[SessionOutputLocation] = None
- ) -> Session:
+ def create_session(self, name=None, output=None):
+ # type: (Optional[str], Optional[SessionOutputLocation]) -> Session
"""
Create a session with an output. Don't specify an output
to create a session without an output.
import contextlib
import sys
-from typing import Optional
+from typing import Iterator, Optional
class InvalidTestPlan(RuntimeError):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
class BailOut(RuntimeError):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
class TestCase:
- def __init__(self, tap_generator: "TapGenerator", description: str):
- self._tap_generator = tap_generator
- self._result: Optional[bool] = None
- self._description = description
+ def __init__(
+ self,
+ tap_generator, # type: "TapGenerator"
+ description, # type: str
+ ):
+ self._tap_generator = tap_generator # type: "TapGenerator"
+ self._result = None # type: Optional[bool]
+ self._description = description # type: str
@property
- def result(self) -> Optional[bool]:
+ def result(self):
+ # type: () -> Optional[bool]
return self._result
@property
- def description(self) -> str:
+ def description(self):
+ # type: () -> str
return self._description
- def _set_result(self, result: bool) -> None:
+ def _set_result(self, result):
+ # type: (bool) -> None
if self._result is not None:
raise RuntimeError("Can't set test case result twice")
self._result = result
self._tap_generator.test(result, self._description)
- def success(self) -> None:
+ def success(self):
+ # type: () -> None
self._set_result(True)
- def fail(self) -> None:
+ def fail(self):
+ # type: () -> None
self._set_result(False)
# Produces a test execution report in the TAP format.
class TapGenerator:
- def __init__(self, total_test_count: int):
+ def __init__(self, total_test_count):
+ # type: (int) -> None
if total_test_count <= 0:
raise ValueError("Test count must be greater than zero")
- self._total_test_count: int = total_test_count
- self._last_test_case_id: int = 0
- self._printed_plan: bool = False
- self._has_failure: bool = False
+ self._total_test_count = total_test_count # type: int
+ self._last_test_case_id = 0 # type: int
+ self._printed_plan = False # type: bool
+ self._has_failure = False # type: bool
def __del__(self):
if self.remaining_test_cases > 0:
)
@property
- def remaining_test_cases(self) -> int:
+ def remaining_test_cases(self):
+ # type: () -> int
return self._total_test_count - self._last_test_case_id
- def _print(self, msg: str) -> None:
+ def _print(self, msg):
+ # type: (str) -> None
if not self._printed_plan:
print(
"1..{total_test_count}".format(total_test_count=self._total_test_count),
print(msg, flush=True)
- def skip_all(self, reason) -> None:
+ def skip_all(self, reason):
+ # type: (str) -> None
if self._last_test_case_id != 0:
raise RuntimeError("Can't skip all tests after running test cases")
self._last_test_case_id = self._total_test_count
- def skip(self, reason, skip_count: int = 1) -> None:
+ def skip(self, reason, skip_count=1):
+ # type: (str, int) -> None
for i in range(skip_count):
self._last_test_case_id = self._last_test_case_id + 1
self._print(
)
)
- def bail_out(self, reason: str) -> None:
+ def bail_out(self, reason):
+ # type: (str) -> None
self._print("Bail out! {reason}".format(reason=reason))
self._last_test_case_id = self._total_test_count
raise BailOut(reason)
- def test(self, result: bool, description: str) -> None:
+ def test(self, result, description):
+ # type: (bool, str) -> None
if self._last_test_case_id == self._total_test_count:
raise InvalidTestPlan("Executing too many tests")
)
)
- def ok(self, description: str) -> None:
+ def ok(self, description):
+ # type: (str) -> None
self.test(True, description)
- def fail(self, description: str) -> None:
+ def fail(self, description):
+ # type: (str) -> None
self.test(False, description)
@property
- def is_successful(self) -> bool:
+ def is_successful(self):
+ # type: () -> bool
return (
self._last_test_case_id == self._total_test_count and not self._has_failure
)
@contextlib.contextmanager
- def case(self, description: str):
+ def case(self, description):
+ # type: (str) -> Iterator[TestCase]
test_case = TestCase(self, description)
try:
yield test_case
if test_case.result is None:
test_case.success()
- def diagnostic(self, msg) -> None:
+ def diagnostic(self, msg):
+ # type: (str) -> None
print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)