return "<%s.%s>" % (self.__class__.__name__, self.name)
+@enum.unique
+class BufferSharingPolicy(enum.Enum):
+ """Buffer sharing policy."""
+
+ PerUID = "Per-UID buffering"
+ PerPID = "Per-PID buffering"
+
+ def __repr__(self):
+ return "<%s.%s>" % (self.__class__.__name__, self.name)
+
+
class EventRule(abc.ABC):
"""Event rule base class, see LTTNG-EVENT-RULE(7)."""
class LogLevelRule:
+ def __eq__(self, other):
+ # type (LogLevelRule) -> bool
+ if type(self) != type(other):
+ return False
+
+ return self.level == other.level
+
+
+@enum.unique
+class LogLevel(enum.Enum):
pass
+@enum.unique
+class UserLogLevel(LogLevel):
+ EMERGENCY = 0
+ ALERT = 1
+ CRITICAL = 2
+ ERROR = 3
+ WARNING = 4
+ NOTICE = 5
+ INFO = 6
+ DEBUG_SYSTEM = 7
+ DEBUG_PROGRAM = 8
+ DEBUG_PROCESS = 9
+ DEBUG_MODULE = 10
+ DEBUG_UNIT = 11
+ DEBUG_FUNCTION = 12
+ DEBUG_LINE = 13
+ DEBUG = 14
+
+
+@enum.unique
+class JULLogLevel(LogLevel):
+ OFF = 2147483647
+ SEVERE = 1000
+ WARNING = 900
+ INFO = 800
+ CONFIG = 700
+ FINE = 500
+ FINER = 400
+ FINEST = 300
+ ALL = -2147483648
+
+
+@enum.unique
+class Log4jLogLevel(LogLevel):
+ OFF = 2147483647
+ FATAL = 50000
+ ERROR = 40000
+ WARN = 30000
+ INFO = 20000
+ DEBUG = 10000
+ TRACE = 5000
+ ALL = -2147483648
+
+
+@enum.unique
+class PythonLogLevel(LogLevel):
+ CRITICAL = 50
+ ERROR = 40
+ WARNING = 30
+ INFO = 20
+ DEBUG = 10
+ NOTSET = 0
+
+
class LogLevelRuleAsSevereAs(LogLevelRule):
def __init__(self, level):
- # type: (int)
+ # type: (LogLevel)
self._level = level
@property
def level(self):
- # type: () -> int
+ # type: () -> LogLevel
return self._level
class LogLevelRuleExactly(LogLevelRule):
def __init__(self, level):
- # type: (int)
+ # type: (LogLevel)
self._level = level
@property
def level(self):
- # type: () -> int
+ # type: () -> LogLevel
return self._level
self,
name_pattern=None, # type: Optional[str]
filter_expression=None, # type: Optional[str]
- log_level_rule=None, # type: Optional[LogLevelRule]
- name_pattern_exclusions=None, # type: Optional[List[str]]
):
self._name_pattern = name_pattern # type: Optional[str]
self._filter_expression = filter_expression # type: Optional[str]
- self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
- self._name_pattern_exclusions = (
- name_pattern_exclusions
- ) # type: Optional[List[str]]
+
+ def _equals(self, other):
+ # type (TracepointEventRule) -> bool
+ # Overridden by derived classes that have supplementary attributes.
+ return True
+
+ def __eq__(self, other):
+ # type (TracepointEventRule) -> bool
+ if type(self) != type(other):
+ return False
+
+ if self.name_pattern != other.name_pattern:
+ return False
+
+ if self.filter_expression != other.filter_expression:
+ return False
+
+ return self._equals(other)
@property
def name_pattern(self):
# type: () -> Optional[str]
return self._filter_expression
+
+class UserTracepointEventRule(TracepointEventRule):
+ def __init__(
+ self,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
+ ):
+ TracepointEventRule.__init__(self, name_pattern, filter_expression)
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
+
+ if log_level_rule and not isinstance(log_level_rule.level, UserLogLevel):
+ raise ValueError("Log level rule must use a UserLogLevel as its value")
+
+ def _equals(self, other):
+ # type (UserTracepointEventRule) -> bool
+ return (
+ self.log_level_rule == other.log_level_rule
+ and self.name_pattern_exclusions == other.name_pattern_exclusions
+ )
+
@property
def log_level_rule(self):
# type: () -> Optional[LogLevelRule]
return self._name_pattern_exclusions
-class UserTracepointEventRule(TracepointEventRule):
+class Log4jTracepointEventRule(TracepointEventRule):
def __init__(
self,
name_pattern=None, # type: Optional[str]
log_level_rule=None, # type: Optional[LogLevelRule]
name_pattern_exclusions=None, # type: Optional[List[str]]
):
- TracepointEventRule.__init__(**locals())
+ TracepointEventRule.__init__(self, name_pattern, filter_expression)
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
+ if log_level_rule and not isinstance(log_level_rule.level, Log4jLogLevel):
+ raise ValueError("Log level rule must use a Log4jLogLevel as its value")
-class KernelTracepointEventRule(TracepointEventRule):
+ def _equals(self, other):
+ # type (Log4jTracepointEventRule) -> bool
+ return (
+ self.log_level_rule == other.log_level_rule
+ and self.name_pattern_exclusions == other.name_pattern_exclusions
+ )
+
+ @property
+ def log_level_rule(self):
+ # type: () -> Optional[LogLevelRule]
+ return self._log_level_rule
+
+ @property
+ def name_pattern_exclusions(self):
+ # type: () -> Optional[List[str]]
+ return self._name_pattern_exclusions
+
+
+class JULTracepointEventRule(TracepointEventRule):
+ def __init__(
+ self,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
+ ):
+ TracepointEventRule.__init__(self, name_pattern, filter_expression)
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
+
+ if log_level_rule and not isinstance(log_level_rule.level, JULLogLevel):
+ raise ValueError("Log level rule must use a JULLogLevel as its value")
+
+ def _equals(self, other):
+ # type (JULTracepointEventRule) -> bool
+ return (
+ self.log_level_rule == other.log_level_rule
+ and self.name_pattern_exclusions == other.name_pattern_exclusions
+ )
+
+ @property
+ def log_level_rule(self):
+ # type: () -> Optional[LogLevelRule]
+ return self._log_level_rule
+
+ @property
+ def name_pattern_exclusions(self):
+ # type: () -> Optional[List[str]]
+ return self._name_pattern_exclusions
+
+
+class PythonTracepointEventRule(TracepointEventRule):
def __init__(
self,
name_pattern=None, # type: Optional[str]
filter_expression=None, # type: Optional[str]
log_level_rule=None, # type: Optional[LogLevelRule]
name_pattern_exclusions=None, # type: Optional[List[str]]
+ ):
+ TracepointEventRule.__init__(self, name_pattern, filter_expression)
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
+
+ if log_level_rule and not isinstance(log_level_rule.level, PythonLogLevel):
+ raise ValueError("Log level rule must use a PythonLogLevel as its value")
+
+ def _equals(self, other):
+ # type (PythonTracepointEventRule) -> bool
+ return (
+ self.log_level_rule == other.log_level_rule
+ and self.name_pattern_exclusions == other.name_pattern_exclusions
+ )
+
+ @property
+ def log_level_rule(self):
+ # type: () -> Optional[LogLevelRule]
+ return self._log_level_rule
+
+ @property
+ def name_pattern_exclusions(self):
+ # type: () -> Optional[List[str]]
+ return self._name_pattern_exclusions
+
+
+class KernelTracepointEventRule(TracepointEventRule):
+ def __init__(
+ self,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
):
TracepointEventRule.__init__(**locals())
@abc.abstractmethod
def add_context(self, context_type):
# type: (ContextType) -> None
- pass
+ raise NotImplementedError
@property
@abc.abstractmethod
def domain(self):
# type: () -> TracingDomain
- pass
+ raise NotImplementedError
@property
@abc.abstractmethod
def name(self):
# type: () -> str
- pass
+ raise NotImplementedError
@abc.abstractmethod
def add_recording_rule(self, rule) -> None:
# type: (Type[EventRule]) -> None
- pass
+ raise NotImplementedError
class SessionOutputLocation(abc.ABC):
@abc.abstractmethod
def track(self, pid):
# type: (int) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, pid):
# type: (int) -> None
- pass
+ raise NotImplementedError
class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
def track(self, vpid):
# type: (int) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, vpid):
# type: (int) -> None
- pass
+ raise NotImplementedError
class UserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
def track(self, uid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, uid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
def track(self, vuid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, vuid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
def track(self, gid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, gid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
def track(self, vgid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def untrack(self, vgid):
# type: (Union[int, str]) -> None
- pass
+ raise NotImplementedError
class Session(abc.ABC):
@abc.abstractmethod
def name(self):
# type: () -> str
- pass
+ raise NotImplementedError
@property
@abc.abstractmethod
def output(self):
# type: () -> Optional[Type[SessionOutputLocation]]
- pass
+ raise NotImplementedError
@abc.abstractmethod
- def add_channel(self, domain, channel_name=None):
- # type: (TracingDomain, Optional[str]) -> Channel
+ def add_channel(
+ self,
+ domain,
+ channel_name=None,
+ buffer_sharing_policy=BufferSharingPolicy.PerUID,
+ ):
+ # type: (TracingDomain, Optional[str], BufferSharingPolicy) -> Channel
"""Add a channel with default attributes to the session."""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def start(self):
# type: () -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def stop(self):
# type: () -> None
- pass
+ raise NotImplementedError
@abc.abstractmethod
def destroy(self):
# type: () -> None
- pass
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def is_active(self):
+ # type: () -> bool
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def rotate(self):
+ # type: () -> None
+ raise NotImplementedError
@abc.abstractproperty
def kernel_pid_process_attribute_tracker(self):
Create a session with an output. Don't specify an output
to create a session without an output.
"""
- pass
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def start_session_by_name(self, name):
+ # type: (str) -> None
+ """
+ Start a session by name.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def start_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ """
+ Start sessions whose name matches `pattern`, see GLOB(7).
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def start_sessions_all(self):
+ """
+ Start all sessions visible to the current user.
+ """
+ # type: () -> None
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def stop_session_by_name(self, name):
+ # type: (str) -> None
+ """
+ Stop a session by name.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def stop_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ """
+ Stop sessions whose name matches `pattern`, see GLOB(7).
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def stop_sessions_all(self):
+ """
+ Stop all sessions visible to the current user.
+ """
+ # type: () -> None
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def destroy_session_by_name(self, name):
+ # type: (str) -> None
+ """
+ Destroy a session by name.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def destroy_session_by_glob_pattern(self, pattern):
+ # type: (str) -> None
+ """
+ Destroy sessions whose name matches `pattern`, see GLOB(7).
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def destroy_sessions_all(self):
+ # type: () -> None
+ """
+ Destroy all sessions visible to the current user.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def list_sessions(self):
+ # type: () -> List[Session]
+ """
+ List all sessions visible to the current user.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def rotate_session_by_name(self, name, wait=True):
+ # type: (str, bool) -> None
+ """
+ Rotate a session
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def schedule_size_based_rotation(self, name, size_bytes):
+ # type: (str, int) -> None
+ """
+ Schedule automatic size-based rotations.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def schedule_time_based_rotation(self, name, period_seconds):
+ # type: (str, int) -> None
+ """
+ Schedule automatic time-based rotations.
+ """
+ raise NotImplementedError