from lttng import *
+
# This error will be raised is something goes wrong
class LTTngError(Exception):
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
-#Setting up the domain to use
+# Setting up the domain to use
dom = Domain()
dom.type = DOMAIN_KERNEL
-#Setting up a channel to use
+# Setting up a channel to use
channel = Channel()
channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.read_timer_interval = 200
channel.attr.output = EVENT_SPLICE
-#Setting up some events that will be used
+# Setting up some events that will be used
event = Event()
event.type = EVENT_TRACEPOINT
event.loglevel_type = EVENT_LOGLEVEL_ALL
sched_process_free.loglevel_type = EVENT_LOGLEVEL_ALL
-#Creating a new session
-res = create("test","/lttng-traces/test")
-if res<0:
- raise LTTngError(strerror(res))
+# Creating a new session
+res = create("test", "/lttng-traces/test")
+if res < 0:
+ raise LTTngError(strerror(res))
-#Creating handle
+# Creating handle
han = None
han = Handle("test", dom)
if han is None:
- raise LTTngError("Handle not created")
+ raise LTTngError("Handle not created")
-#Enabling the kernel channel
+# Enabling the kernel channel
res = enable_channel(han, channel)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Enabling some events in given channel
-#To enable all events in default channel, use
-#enable_event(han, event, None)
+# Enabling some events in given channel
+# To enable all events in default channel, use
+# enable_event(han, event, None)
res = enable_event(han, sched_switch, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = enable_event(han, sched_process_exit, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
res = enable_event(han, sched_process_free, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Disabling an event
+# Disabling an event
res = disable_event(han, sched_switch.name, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Getting a list of the channels
+# Getting a list of the channels
l = list_channels(han)
if type(l) is int:
- raise LTTngError(strerror(l))
+ raise LTTngError(strerror(l))
-#Starting the trace
+# Starting the trace
res = start("test")
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Stopping the trace
+# Stopping the trace
res = stop("test")
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Disabling a channel
+# Disabling a channel
res = disable_channel(han, channel.name)
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
-#Destroying the handle
+# Destroying the handle
del han
-#Destroying the session
+# Destroying the session
res = destroy("test")
-if res<0:
- raise LTTngError(strerror(res))
+if res < 0:
+ raise LTTngError(strerror(res))
import tempfile
from lttng import *
-class TestLttngPythonModule (unittest.TestCase):
+
+class TestLttngPythonModule(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
r = destroy("test_kernel_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
-
def test_kernel_event(self):
dom = Domain()
dom.buf_type = BUFFER_GLOBAL
channel = Channel()
- channel.name="mychan"
+ channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
han = Handle("test_kernel_event", dom)
- #Create session test
+ # Create session test
r = create("test_kernel_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling channel tests
+ # Enabling channel tests
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling events tests
+ # Enabling events tests
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling events tests
+ # Disabling events tests
r = disable_event(han, sched_switch.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, sched_process_free.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Renabling events tests
+ # Renabling events tests
r = enable_event(han, sched_switch, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, sched_process_free, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Start, stop, destroy
+ # Start, stop, destroy
r = start("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
- r=disable_channel(han, channel.name)
+ r = disable_channel(han, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- r=destroy("test_kernel_event")
+ r = destroy("test_kernel_event")
self.assertGreaterEqual(r, 0, strerror(r))
-
-
def test_ust_all_events(self):
dom = Domain()
dom.type = DOMAIN_UST
r = destroy("test_ust_all_ev")
self.assertGreaterEqual(r, 0, strerror(r))
-
def test_ust_event(self):
dom = Domain()
dom.buf_type = BUFFER_PER_UID
channel = Channel()
- channel.name="mychan"
+ channel.name = "mychan"
channel.attr.overwrite = 0
channel.attr.subbuf_size = 4096
channel.attr.num_subbuf = 8
han = Handle("test_ust_event", dom)
- #Create session test
+ # Create session test
r = create("test_ust_event", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling channel tests
+ # Enabling channel tests
r = enable_channel(han, channel)
self.assertGreaterEqual(r, 0, strerror(r))
- #Enabling events tests
+ # Enabling events tests
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling events tests
+ # Disabling events tests
r = disable_event(han, ev1.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = disable_event(han, ev3.name, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Renabling events tests
+ # Renabling events tests
r = enable_event(han, ev1, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, ev3, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- #Start, stop
+ # Start, stop
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
- #Restart/restop
+ # Restart/restop
r = start("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
r = stop("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
- #Disabling channel and destroy
- r=disable_channel(han, channel.name)
+ # Disabling channel and destroy
+ r = disable_channel(han, channel.name)
self.assertGreaterEqual(r, 0, strerror(r))
- r=destroy("test_ust_event")
+ r = destroy("test_ust_event")
self.assertGreaterEqual(r, 0, strerror(r))
-
def test_other_functions(self):
dom = Domain()
- dom.type=DOMAIN_KERNEL
+ dom.type = DOMAIN_KERNEL
dom.buf_type = BUFFER_GLOBAL
- event=Event()
- event.type=EVENT_TRACEPOINT
- event.loglevel_type=EVENT_LOGLEVEL_ALL
+ event = Event()
+ event.type = EVENT_TRACEPOINT
+ event.loglevel_type = EVENT_LOGLEVEL_ALL
ctx = EventContext()
- ctx.type=EVENT_CONTEXT_PID
+ ctx.type = EVENT_CONTEXT_PID
chattr = ChannelAttr()
chattr.overwrite = 0
chattr.read_timer_interval = 200
chattr.output = EVENT_SPLICE
- han = Handle("test_otherf" , dom)
+ han = Handle("test_otherf", dom)
r = create("test_otherf", self.tmpdir.name)
self.assertGreaterEqual(r, 0, strerror(r))
r = enable_event(han, event, None)
self.assertGreaterEqual(r, 0, strerror(r))
- #Context test
+ # Context test
r = add_context(han, ctx, "sched_switch", "channel0")
self.assertGreaterEqual(r, 0, strerror(r))
- #Any channel
+ # Any channel
r = add_context(han, ctx, "sched_wakeup", None)
self.assertGreaterEqual(r, 0, strerror(r))
- #All events
+ # All events
r = add_context(han, ctx, None, None)
self.assertGreaterEqual(r, 0, strerror(r))
- #Def. channel attr
+ # Def. channel attr
channel_set_default_attr(dom, chattr)
channel_set_default_attr(None, None)
- #Ses Daemon alive
+ # Ses Daemon alive
r = session_daemon_alive()
self.assertTrue(r == 1 or r == 0, strerror(r))
- #Setting trace group
+ # Setting trace group
r = set_tracing_group("testing")
self.assertGreaterEqual(r, 0, strerror(r))
-
r = start("test_otherf")
self.assertGreaterEqual(r, 0, strerror(r))
time.sleep(2)
suite.addTest(TestLttngPythonModule("test_ust_all_events"))
return suite
+
def kernel_suite():
suite = unittest.TestSuite()
suite.addTest(TestLttngPythonModule("test_kernel_event"))
suite.addTest(TestLttngPythonModule("test_other_functions"))
return suite
-if __name__ == '__main__':
+
+if __name__ == "__main__":
destroy("test_kernel_event")
destroy("test_kernel_all_events")
destroy("test_ust_all_events")
import bt2
except ImportError:
# quick fix for debian-based distros
- sys.path.append("/usr/local/lib/python%d.%d/site-packages" %
- (sys.version_info.major, sys.version_info.minor))
+ sys.path.append(
+ "/usr/local/lib/python%d.%d/site-packages"
+ % (sys.version_info.major, sys.version_info.minor)
+ )
import bt2
NSEC_PER_SEC = 1000000000
+
class TraceParser:
def __init__(self, trace_msg_iter, pid):
self.trace = trace_msg_iter
# Each test classes checks the payload of different events. Each of
# those checks are stored in a event_name specific dictionnary in this
# data structure.
- self.expect = defaultdict(lambda : defaultdict(int))
+ self.expect = defaultdict(lambda: defaultdict(int))
# This dictionnary holds the value recorded in the trace that are
# tested. Its content is use to print the values that caused a test to
self.recorded_values = {}
def ns_to_hour_nsec(self, ns):
- d = time.localtime(ns/NSEC_PER_SEC)
- return "%02d:%02d:%02d.%09d" % (d.tm_hour, d.tm_min, d.tm_sec,
- ns % NSEC_PER_SEC)
+ d = time.localtime(ns / NSEC_PER_SEC)
+ return "%02d:%02d:%02d.%09d" % (
+ d.tm_hour,
+ d.tm_min,
+ d.tm_sec,
+ ns % NSEC_PER_SEC,
+ )
def parse(self):
# iterate over all the events
continue
method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
- "+", "_")
+ "+", "_"
+ )
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
class WorkingCases(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_wait_fd = validation_args['epoll_wait_fd']
- self.epoll_pwait_fd = validation_args['epoll_pwait_fd']
+ self.epoll_wait_fd = validation_args["epoll_wait_fd"]
+ self.epoll_pwait_fd = validation_args["epoll_pwait_fd"]
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
exceptfd_127 = event["exceptfds"][127]
# check that the FD 1023 is actually set in the readfds
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and overflow == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and overflow == 0
+ ):
self.expect["select_entry"]["select_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
readfd_127 = event["readfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp == 0
+ ):
self.expect["select_exit"]["select_out_fd1023"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_in_nfds1"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x1 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x1
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_exit"]["poll_out_nfds1"] = 1
# Save values of local variables to print in case of test failure
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
- if (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd) and 'EPOLL_CTL_ADD' in op_enum.labels and fd == 0 and \
- _event["data_union"]["fd"] == 0 and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd)
+ and "EPOLL_CTL_ADD" in op_enum.labels
+ and fd == 0
+ and _event["data_union"]["fd"] == 0
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_in_add"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_pwait_exit"] = locals()
+
class WorkingCasesTimeout(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp != 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp != 0
+ ):
self.expect["select_entry"]["select_timeout_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
# field matches the value of POLLIN
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and \
- fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_timeout_in"] = 1
# Save values of local variables to print in case of test failure
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
- if 'EPOLL_CTL_ADD' in op_enum.labels and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ "EPOLL_CTL_ADD" in op_enum.labels
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
# Save values of local variables to print in case of test failure
class PselectInvalidFd(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
class PpollBig(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
if nfds == 2047 and fds_length == 512 and overflow == 1:
fd_0 = event["fds"][0]
fd_511 = event["fds"][511]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0 and \
- fd_511["events"]["POLLIN"] == 1 and \
- fd_511["events"]["POLLPRI"] == 1:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ and fd_511["events"]["POLLIN"] == 1
+ and fd_511["events"]["POLLPRI"] == 1
+ ):
self.expect["poll_entry"]["big_poll_in"] = 1
# Save values of local variables to print in case of test failure
# test of big list of FDs and the behaviour of the overflow
if ret == 2047 and nfds == 2047 and fds_length == 512 and overflow == 1:
- fd_0 = event["fds"][0]
- fd_511 = event["fds"][511]
- if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
- self.expect["poll_exit"]["big_poll_out"] = 1
+ fd_0 = event["fds"][0]
+ fd_511 = event["fds"][511]
+ if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
+ self.expect["poll_exit"]["big_poll_out"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
+
class PpollFdsBufferOverflow(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
class PselectInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
class PpollFdsULongMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
# Save values of local variables to print in case of test failure
self.recorded_values["poll_entry"] = locals()
-
def poll_exit(self, event):
ret = event["ret"]
nfds = event["nfds"]
class EpollPwaitInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
class EpollPwaitIntMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Trace parser')
- parser.add_argument('path', metavar="<path/to/trace>", help='Trace path')
- parser.add_argument('-t', '--test', type=str, help='Test to validate')
- parser.add_argument('-o', '--validation-file', type=str, help='Validation file path')
+ parser = argparse.ArgumentParser(description="Trace parser")
+ parser.add_argument("path", metavar="<path/to/trace>", help="Trace path")
+ parser.add_argument("-t", "--test", type=str, help="Test to validate")
+ parser.add_argument(
+ "-o", "--validation-file", type=str, help="Validation file path"
+ )
args = parser.parse_args()
if not args.test:
try:
test_validation_args = json.load(f)
except Exception as e:
- print('Failed to parse validation file: ' + str(e))
+ print("Failed to parse validation file: " + str(e))
sys.exit(1)
t = None
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("""No sessiond running. Please make sure you are running this test
+ bail(
+ """No sessiond running. Please make sure you are running this test
with the "run" shell script and verify that the lttng tools are
- properly installed.""")
+ properly installed."""
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
start_session(session_info)
-test_process = subprocess.Popen(test_path + "prog.strip", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+test_process = subprocess.Popen(
+ test_path + "prog.strip", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+)
test_process.wait()
-print_test_result(test_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ test_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for statedump events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
start_event_found = False
bin_info_event_found = False
for event_line in babeltrace_process.stdout:
# Let babeltrace finish to get the return code
- if start_event_found and bin_info_event_found and build_id_event_found and \
- debug_link_event_found and end_event_found:
+ if (
+ start_event_found
+ and bin_info_event_found
+ and build_id_event_found
+ and debug_link_event_found
+ and end_event_found
+ ):
continue
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_statedump:start.*", event_line) is not None:
start_event_found = True
elif re.search(r".*lttng_ust_statedump:bin_info.*", event_line) is not None:
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
-print_test_result(start_event_found, current_test, "lttng_ust_statedump:start event found in resulting trace")
+print_test_result(
+ start_event_found,
+ current_test,
+ "lttng_ust_statedump:start event found in resulting trace",
+)
current_test += 1
-print_test_result(bin_info_event_found, current_test, "lttng_ust_statedump:bin_info event found in resulting trace")
+print_test_result(
+ bin_info_event_found,
+ current_test,
+ "lttng_ust_statedump:bin_info event found in resulting trace",
+)
current_test += 1
-print_test_result(build_id_event_found, current_test, "lttng_ust_statedump:build_id event found in resulting trace")
+print_test_result(
+ build_id_event_found,
+ current_test,
+ "lttng_ust_statedump:build_id event found in resulting trace",
+)
current_test += 1
-print_test_result(debug_link_event_found, current_test, "lttng_ust_statedump:debug_link event found in resulting trace")
+print_test_result(
+ debug_link_event_found,
+ current_test,
+ "lttng_ust_statedump:debug_link event found in resulting trace",
+)
current_test += 1
-print_test_result(end_event_found, current_test, "lttng_ust_statedump:end event found in resulting trace")
+print_test_result(
+ end_event_found,
+ current_test,
+ "lttng_ust_statedump:end event found in resulting trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
daemon_pid = None
daemon_process = subprocess.Popen(test_path + "daemon", stdout=subprocess.PIPE)
for line in daemon_process.stdout:
- name, pid = line.decode('utf-8').split()
+ name, pid = line.decode("utf-8").split()
if name == "child_pid":
daemon_pid = int(pid)
if name == "parent_pid":
daemon_process_return_code = daemon_process.wait()
if parent_pid is None or daemon_pid is None:
- bail("Unexpected output received from daemon test executable." + str(daemon_process_output))
-
-print_test_result(daemon_process_return_code == 0, current_test, "Successful call to daemon() and normal exit")
+ bail(
+ "Unexpected output received from daemon test executable."
+ + str(daemon_process_output)
+ )
+
+print_test_result(
+ daemon_process_return_code == 0,
+ current_test,
+ "Successful call to daemon() and normal exit",
+)
current_test += 1
if daemon_process_return_code != 0:
stop_session(session_info)
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
after_daemon_event_pid = -1
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r"before_daemon", event_line) is not None:
if before_daemon_event_found:
- bail("Multiple instances of the before_daemon event found. Please make sure only one instance of this test is runnning.")
+ bail(
+ "Multiple instances of the before_daemon event found. Please make sure only one instance of this test is runnning."
+ )
before_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
if re.search(r"after_daemon", event_line) is not None:
if after_daemon_event_found:
- bail("Multiple instances of the after_daemon event found. Please make sure only one instance of this test is runnning.")
+ bail(
+ "Multiple instances of the after_daemon event found. Please make sure only one instance of this test is runnning."
+ )
after_daemon_event_found = True
match = re.search(r"(?<=pid = )\d+", event_line)
after_daemon_event_pid = int(match.group(0))
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(before_daemon_event_found, current_test, "before_daemon event found in resulting trace")
+print_test_result(
+ before_daemon_event_found,
+ current_test,
+ "before_daemon event found in resulting trace",
+)
current_test += 1
-print_test_result(before_daemon_event_pid == parent_pid, current_test, "Parent pid reported in trace is correct")
+print_test_result(
+ before_daemon_event_pid == parent_pid,
+ current_test,
+ "Parent pid reported in trace is correct",
+)
current_test += 1
-print_test_result(before_daemon_event_found, current_test, "after_daemon event found in resulting trace")
+print_test_result(
+ before_daemon_event_found,
+ current_test,
+ "after_daemon event found in resulting trace",
+)
current_test += 1
-print_test_result(after_daemon_event_pid == daemon_pid, current_test, "Daemon pid reported in trace is correct")
+print_test_result(
+ after_daemon_event_pid == daemon_pid,
+ current_test,
+ "Daemon pid reported in trace is correct",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_exitfast*")
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
-exit_fast_process = subprocess.Popen(test_path + "exit-fast", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+exit_fast_process = subprocess.Popen(
+ test_path + "exit-fast",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
exit_fast_process.wait()
-print_test_result(exit_fast_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ exit_fast_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
-exit_fast_process = subprocess.Popen([test_path + "exit-fast", "suicide"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+exit_fast_process = subprocess.Popen(
+ [test_path + "exit-fast", "suicide"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
exit_fast_process.wait()
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(len(event_lines) == 2, current_test, "Correct number of events found in resulting trace")
+print_test_result(
+ len(event_lines) == 2,
+ current_test,
+ "Correct number of events found in resulting trace",
+)
current_test += 1
if len(event_lines) != 2:
- bail("Unexpected number of events found in resulting trace (" + session_info.trace_path + ")." )
+ bail(
+ "Unexpected number of events found in resulting trace ("
+ + session_info.trace_path
+ + ")."
+ )
match = re.search(r".*message = \"(.*)\"", event_lines[0])
-print_test_result(match is not None and match.group(1) == normal_exit_message, current_test,\
- "Tracepoint message generated during normal exit run is present in trace and has the expected value")
+print_test_result(
+ match is not None and match.group(1) == normal_exit_message,
+ current_test,
+ "Tracepoint message generated during normal exit run is present in trace and has the expected value",
+)
current_test += 1
match = re.search(r".*message = \"(.*)\"", event_lines[1])
-print_test_result(match is not None and match.group(1) == suicide_exit_message, current_test,\
- "Tracepoint message generated during suicide run is present in trace and has the expected value")
+print_test_result(
+ match is not None and match.group(1) == suicide_exit_message,
+ current_test,
+ "Tracepoint message generated during suicide run is present in trace and has the expected value",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_fork*")
start_session(session_info)
-fork_process = subprocess.Popen([test_path + "fork", test_path + "fork2"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+fork_process = subprocess.Popen(
+ [test_path + "fork", test_path + "fork2"],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+)
parent_pid = -1
child_pid = -1
for line in fork_process.stdout:
- line = line.decode('utf-8').replace("\n", "")
+ line = line.decode("utf-8").replace("\n", "")
match = re.search(r"child_pid (\d+)", line)
if match:
child_pid = match.group(1)
fork_process.wait()
-print_test_result(fork_process.returncode == 0, current_test, "Fork test application exited normally")
+print_test_result(
+ fork_process.returncode == 0, current_test, "Fork test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
- if re.search(r"warning", event_line) is not None or re.search(r"error", event_line) is not None:
- print( "# " + event_line )
+ event_line = event_line.decode("utf-8").replace("\n", "")
+ if (
+ re.search(r"warning", event_line) is not None
+ or re.search(r"error", event_line) is not None
+ ):
+ print("# " + event_line)
else:
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
continue
if re.search(r"before_fork", event_line):
- event_before_fork = (event_pid == parent_pid)
+ event_before_fork = event_pid == parent_pid
if re.search(r"after_fork_parent", event_line):
- event_after_fork_parent = (event_pid == parent_pid)
+ event_after_fork_parent = event_pid == parent_pid
if re.search(r"after_fork_child", event_line):
- event_after_fork_child = (event_pid == child_pid)
+ event_after_fork_child = event_pid == child_pid
if re.search(r"after_exec", event_line):
- event_after_exec = (event_pid == child_pid)
+ event_after_exec = event_pid == child_pid
-print_test_result(event_before_fork, current_test, "before_fork event logged by parent process found in trace")
+print_test_result(
+ event_before_fork,
+ current_test,
+ "before_fork event logged by parent process found in trace",
+)
current_test += 1
-print_test_result(event_after_fork_parent, current_test, "after_fork_parent event logged by parent process found in trace")
+print_test_result(
+ event_after_fork_parent,
+ current_test,
+ "after_fork_parent event logged by parent process found in trace",
+)
current_test += 1
-print_test_result(event_after_fork_child, current_test, "after_fork_child event logged by child process found in trace")
+print_test_result(
+ event_after_fork_child,
+ current_test,
+ "after_fork_child event logged by child process found in trace",
+)
current_test += 1
-print_test_result(event_after_exec, current_test, "after_exec event logged by child process found in trace")
+print_test_result(
+ event_after_exec,
+ current_test,
+ "after_exec event logged by child process found in trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "lttng_ust_libc*")
start_session(session_info)
-malloc_process = subprocess.Popen(test_path + "prog", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+malloc_process = subprocess.Popen(
+ test_path + "prog", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+)
malloc_process.wait()
-print_test_result(malloc_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ malloc_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for malloc events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
malloc_event_found = False
free_event_found = False
if malloc_event_found and free_event_found:
continue
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_libc:malloc.*", event_line) is not None:
malloc_event_found = True
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
-print_test_result(malloc_event_found, current_test, "lttng_ust_libc:malloc event found in resulting trace")
+print_test_result(
+ malloc_event_found,
+ current_test,
+ "lttng_ust_libc:malloc event found in resulting trace",
+)
current_test += 1
-print_test_result(free_event_found, current_test, "lttng_ust_libc:free event found in resulting trace")
+print_test_result(
+ free_event_found, current_test, "lttng_ust_libc:free event found in resulting trace"
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
match = re.search(r".*_seqfield1_length = (\d+)", event_line)
if match is None or int(match.group(1)) != 4:
return False
- match = re.search(r".*seqfield1 = \[ \[0\] = (\d+), \[1\] = (\d+), \[2\] = (\d+), \[3\] = (\d+) \]", event_line)
- if match is None or int(match.group(1)) != 116 or int(match.group(2)) != 101 or int(match.group(3)) != 115 or int(match.group(4)) != 116:
+ match = re.search(
+ r".*seqfield1 = \[ \[0\] = (\d+), \[1\] = (\d+), \[2\] = (\d+), \[3\] = (\d+) \]",
+ event_line,
+ )
+ if (
+ match is None
+ or int(match.group(1)) != 116
+ or int(match.group(2)) != 101
+ or int(match.group(3)) != 115
+ or int(match.group(4)) != 116
+ ):
return False
- match = re.search(r".*arrfield1 = \[ \[0\] = (\d), \[1\] = (\d), \[2\] = (\d) \]", event_line)
- if match is None or int(match.group(1)) != 1 or int(match.group(2)) != 2 or int(match.group(3)) != 3:
+ match = re.search(
+ r".*arrfield1 = \[ \[0\] = (\d), \[1\] = (\d), \[2\] = (\d) \]", event_line
+ )
+ if (
+ match is None
+ or int(match.group(1)) != 1
+ or int(match.group(2)) != 2
+ or int(match.group(3)) != 3
+ ):
return False
match = re.search(r".*arrfield2 = \"([a-z]*)\"", event_line)
if match is None or match.group(1) != "test":
return True
+
NR_TESTS = 0
DYNAMIC_TEST_ENABLED = False
DYNAMIC_TEST_ENABLED = True
# Only enable tests that were compiled successfully
-test_executables = [executable for executable in test_executables if os.path.exists(executable)]
+test_executables = [
+ executable for executable in test_executables if os.path.exists(executable)
+]
NR_TESTS += len(test_executables) * 10
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
if DYNAMIC_TEST_ENABLED:
session_info = create_session()
start_session(session_info)
# Dry run, no events should be logged
- demo_process = subprocess.Popen(test_path + "demo", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ demo_process = subprocess.Popen(
+ test_path + "demo", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
demo_process.wait()
stop_session(session_info)
- print_test_result(demo_process.returncode == 0, current_test,\
- "Running application dynamically linked to providers, no preload")
+ print_test_result(
+ demo_process.returncode == 0,
+ current_test,
+ "Running application dynamically linked to providers, no preload",
+ )
current_test += 1
trace_path = os.path.join(session_info.trace_path, "ust", "uid")
- print_test_result(not os.path.exists(trace_path), current_test,\
- "No events logged when running demo application without preloading providers")
+ print_test_result(
+ not os.path.exists(trace_path),
+ current_test,
+ "No events logged when running demo application without preloading providers",
+ )
current_test += 1
shutil.rmtree(session_info.tmp_directory)
enable_ust_tracepoint_event(session_info, "ust_tests_demo*")
start_session(session_info)
- demo_process = subprocess.Popen(executable, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ demo_process = subprocess.Popen(
+ executable, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
+ )
demo_process.wait()
stop_session(session_info)
trace_found = os.path.exists(session_info.trace_path)
- print_test_result(trace_found, current_test,\
- "{0}, resulting trace found".format(executable_name))
+ print_test_result(
+ trace_found, current_test, "{0}, resulting trace found".format(executable_name)
+ )
current_test += 1
if not trace_found:
continue
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(
+ BABELTRACE_BIN
+ )
+ )
# We should find 8 events in the resulting trace
event_entries = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_entries.append(event_line)
if len(event_entries) != 8:
- bail("{0}, wrong number of events found in resulting trace.".format(executable_name))
+ bail(
+ "{0}, wrong number of events found in resulting trace.".format(
+ executable_name
+ )
+ )
shutil.rmtree(session_info.tmp_directory)
- print_test_result(len(event_entries) == 8, current_test,\
- "{0}, total number of events logged is correct".format(executable_name))
+ print_test_result(
+ len(event_entries) == 8,
+ current_test,
+ "{0}, total number of events logged is correct".format(executable_name),
+ )
current_test += 1
# Check each loop event
match = re.search(r".*ust_tests_demo:starting.*value = (\d+) ", event_entries[0])
- print_test_result(match is not None and (int(match.group(1)) == 123), current_test,\
- "{0}, ust_tests_demo:starting event found in trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 123),
+ current_test,
+ "{0}, ust_tests_demo:starting event found in trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
current_test += 1
for i in range(5):
- print_test_result(check_ust_test_demo2_event(event_entries[i+1], i), current_test,\
- "{0}, ust_tests_demo2:loop event found in trace and arguments are correct, iteration ".format(executable_name)\
- + str(i + 1))
+ print_test_result(
+ check_ust_test_demo2_event(event_entries[i + 1], i),
+ current_test,
+ "{0}, ust_tests_demo2:loop event found in trace and arguments are correct, iteration ".format(
+ executable_name
+ )
+ + str(i + 1),
+ )
current_test += 1
match = re.search(r".*ust_tests_demo:done.*value = (\d+)", event_entries[6])
- print_test_result(match is not None and (int(match.group(1)) == 456), current_test,\
- "{0}, ust_tests_demo:done event found in resulting trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 456),
+ current_test,
+ "{0}, ust_tests_demo:done event found in resulting trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
current_test += 1
match = re.search(r".*ust_tests_demo3:done.*value = (\d+)", event_entries[7])
- print_test_result(match is not None and (int(match.group(1)) == 42), current_test,\
- "{0}, ust_tests_demo3:done event found in resulting trace with a correct integer argument".format(executable_name))
+ print_test_result(
+ match is not None and (int(match.group(1)) == 42),
+ current_test,
+ "{0}, ust_tests_demo3:done event found in resulting trace with a correct integer argument".format(
+ executable_name
+ ),
+ )
current_test += 1
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("No sessiond running. Please make sure you are running this test with the \"run\" shell script and verify that the lttng tools are properly installed.")
+ bail(
+ 'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_td*")
test_env = os.environ.copy()
test_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
-td_process = subprocess.Popen(test_path + "type-declarations", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=test_env)
+td_process = subprocess.Popen(
+ test_path + "type-declarations",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
td_process.wait()
-print_test_result(td_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ td_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check event fields using type declarations are present
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))
event_lines = []
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
event_lines.append(event_line)
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
if babeltrace_process.returncode != 0:
bail("Unreadable trace; can't proceed with analysis.")
-print_test_result(len(event_lines) == 5, current_test, "Correct number of events found in resulting trace")
+print_test_result(
+ len(event_lines) == 5,
+ current_test,
+ "Correct number of events found in resulting trace",
+)
current_test += 1
if len(event_lines) != 5:
- bail("Unexpected number of events found in resulting trace (" + session_info.trace_path + ")." )
-
-match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" :.*enumfield_third = .*:.*", event_lines[0])
-print_test_result(match is not None and match.group(1) == "tptest", current_test,\
- "First tracepoint is present")
+ bail(
+ "Unexpected number of events found in resulting trace ("
+ + session_info.trace_path
+ + ")."
+ )
+
+match = re.search(
+ r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" :.*enumfield_third = .*:.*",
+ event_lines[0],
+)
+print_test_result(
+ match is not None and match.group(1) == "tptest",
+ current_test,
+ "First tracepoint is present",
+)
current_test += 1
-print_test_result(match is not None and match.group(2) == "zero", current_test,\
- "First tracepoint's enum value maps to zero")
+print_test_result(
+ match is not None and match.group(2) == "zero",
+ current_test,
+ "First tracepoint's enum value maps to zero",
+)
current_test += 1
-print_test_result(match is not None and match.group(3) == "one", current_test,\
- "First tracepoint's second enum value maps to one")
+print_test_result(
+ match is not None and match.group(3) == "one",
+ current_test,
+ "First tracepoint's second enum value maps to one",
+)
current_test += 1
match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*", event_lines[1])
-print_test_result(match is not None and match.group(1) == "tptest_bis", current_test,\
- "Second tracepoint is present")
+print_test_result(
+ match is not None and match.group(1) == "tptest_bis",
+ current_test,
+ "Second tracepoint is present",
+)
current_test += 1
-print_test_result(match is not None and match.group(2) == "zero", current_test,\
- "Second tracepoint's enum value maps to zero")
+print_test_result(
+ match is not None and match.group(2) == "zero",
+ current_test,
+ "Second tracepoint's enum value maps to zero",
+)
current_test += 1
-match = re.search(r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" .*", event_lines[2])
+match = re.search(
+ r".*ust_tests_td:(.*):.*enumfield = \( \"(.*)\" :.*enumfield_bis = \( \"(.*)\" .*",
+ event_lines[2],
+)
-print_test_result(match is not None and match.group(2) == "one", current_test,\
- "Third tracepoint's enum value maps to one")
+print_test_result(
+ match is not None and match.group(2) == "one",
+ current_test,
+ "Third tracepoint's enum value maps to one",
+)
current_test += 1
-print_test_result('{ zero = ( "zero" : container = 0 ), two = ( "two" : container = 2 ), three = ( "three" : container = 3 ), fifteen = ( "ten_to_twenty" : container = 15 ), twenty_one = ( "twenty_one" : container = 21 ) }' in event_lines[4],
- current_test, 'Auto-incrementing enum values are correct')
+print_test_result(
+ '{ zero = ( "zero" : container = 0 ), two = ( "two" : container = 2 ), three = ( "three" : container = 3 ), fifteen = ( "ten_to_twenty" : container = 15 ), twenty_one = ( "twenty_one" : container = 21 ) }'
+ in event_lines[4],
+ current_test,
+ "Auto-incrementing enum values are correct",
+)
shutil.rmtree(session_info.tmp_directory)
from test_utils import *
-have_dlmopen = (os.environ.get('LTTNG_TOOLS_HAVE_DLMOPEN') == '1')
+have_dlmopen = os.environ.get("LTTNG_TOOLS_HAVE_DLMOPEN") == "1"
NR_TESTS = 14
# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
- bail("""No sessiond running. Please make sure you are running this test
+ bail(
+ """No sessiond running. Please make sure you are running this test
with the "run" shell script and verify that the lttng tools are
- properly installed.""")
+ properly installed."""
+ )
session_info = create_session()
enable_ust_tracepoint_event(session_info, "*")
test_env = os.environ.copy()
test_env["LD_PRELOAD"] = test_env.get("LD_PRELOAD", "") + ":liblttng-ust-dl.so"
test_env["LD_LIBRARY_PATH"] = test_env.get("LD_LIBRARY_PATH", "") + ":" + test_path
-test_process = subprocess.Popen(test_path + "prog",
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
- env=test_env)
+test_process = subprocess.Popen(
+ test_path + "prog",
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ env=test_env,
+)
test_process.wait()
-print_test_result(test_process.returncode == 0, current_test, "Test application exited normally")
+print_test_result(
+ test_process.returncode == 0, current_test, "Test application exited normally"
+)
current_test += 1
stop_session(session_info)
# Check for dl events in the resulting trace
try:
- babeltrace_process = subprocess.Popen([BABELTRACE_BIN, session_info.trace_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ babeltrace_process = subprocess.Popen(
+ [BABELTRACE_BIN, session_info.trace_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
except FileNotFoundError:
- bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN), session_info)
+ bail(
+ "Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN),
+ session_info,
+ )
dlopen_event_found = 0
dlmopen_event_found = 0
for event_line in babeltrace_process.stdout:
- event_line = event_line.decode('utf-8').replace("\n", "")
+ event_line = event_line.decode("utf-8").replace("\n", "")
if re.search(r".*lttng_ust_dl:dlopen.*", event_line) is not None:
dlopen_event_found += 1
elif re.search(r".*lttng_ust_dl:dlmopen.*", event_line) is not None:
babeltrace_process.wait()
-print_test_result(babeltrace_process.returncode == 0, current_test, "Resulting trace is readable")
+print_test_result(
+ babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
+)
current_test += 1
-print_test_result(dlopen_event_found > 0, current_test, "lttng_ust_dl:dlopen event found in resulting trace")
+print_test_result(
+ dlopen_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlopen event found in resulting trace",
+)
current_test += 1
if have_dlmopen:
- print_test_result(dlmopen_event_found > 0, current_test, "lttng_ust_dl:dlmopen event found in resulting trace")
+ print_test_result(
+ dlmopen_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlmopen event found in resulting trace",
+ )
else:
- skip_test(current_test, 'dlmopen() is not available')
+ skip_test(current_test, "dlmopen() is not available")
current_test += 1
-print_test_result(build_id_event_found > 0, current_test, "lttng_ust_dl:build_id event found in resulting trace")
+print_test_result(
+ build_id_event_found > 0,
+ current_test,
+ "lttng_ust_dl:build_id event found in resulting trace",
+)
current_test += 1
-print_test_result(debug_link_event_found > 0, current_test, "lttng_ust_dl:debug_link event found in resulting trace")
+print_test_result(
+ debug_link_event_found > 0,
+ current_test,
+ "lttng_ust_dl:debug_link event found in resulting trace",
+)
current_test += 1
-print_test_result(dlclose_event_found > 0, current_test, "lttng_ust_dl:dlclose event found in resulting trace")
+print_test_result(
+ dlclose_event_found > 0,
+ current_test,
+ "lttng_ust_dl:dlclose event found in resulting trace",
+)
current_test += 1
-print_test_result(load_event_found > 0, current_test, "lttng_ust_lib:load event found in resulting trace")
+print_test_result(
+ load_event_found > 0,
+ current_test,
+ "lttng_ust_lib:load event found in resulting trace",
+)
current_test += 1
-print_test_result(load_build_id_event_found > 0, current_test, "lttng_ust_lib:build_id event found in resulting trace")
+print_test_result(
+ load_build_id_event_found > 0,
+ current_test,
+ "lttng_ust_lib:build_id event found in resulting trace",
+)
current_test += 1
-print_test_result(load_debug_link_event_found > 0, current_test, "lttng_ust_lib:debug_link event found in resulting trace")
+print_test_result(
+ load_debug_link_event_found > 0,
+ current_test,
+ "lttng_ust_lib:debug_link event found in resulting trace",
+)
current_test += 1
-print_test_result(unload_event_found == 3, current_test, "lttng_ust_lib:unload event found 3 times in resulting trace")
+print_test_result(
+ unload_event_found == 3,
+ current_test,
+ "lttng_ust_lib:unload event found 3 times in resulting trace",
+)
current_test += 1
-print_test_result(load_libfoo_found == 1, current_test, "lttng_ust_lib:load libfoo.so event found once in resulting trace")
+print_test_result(
+ load_libfoo_found == 1,
+ current_test,
+ "lttng_ust_lib:load libfoo.so event found once in resulting trace",
+)
current_test += 1
-print_test_result(load_libbar_found == 1, current_test, "lttng_ust_lib:load libbar.so event found once in resulting trace")
+print_test_result(
+ load_libbar_found == 1,
+ current_test,
+ "lttng_ust_lib:load libbar.so event found once in resulting trace",
+)
current_test += 1
-print_test_result(load_libzzz_found == 1, current_test, "lttng_ust_lib:load libzzz.so event found once in resulting trace")
+print_test_result(
+ load_libzzz_found == 1,
+ current_test,
+ "lttng_ust_lib:load libzzz.so event found once in resulting trace",
+)
current_test += 1
shutil.rmtree(session_info.tmp_directory)
session_name=self.name,
domain_name=domain_option_name,
channel_name=channel_name,
- buffer_sharing_policy="--buffers-uid"
- if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
- else "--buffers-pid",
+ buffer_sharing_policy=(
+ "--buffers-uid"
+ if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
+ else "--buffers-pid"
+ ),
)
)
return _Channel(self._client, channel_name, domain, self)
import subprocess
import re
+
def addr2line(executable, addr):
"""
- Uses binutils' addr2line to get function containing a given address
+ Uses binutils' addr2line to get function containing a given address
"""
- cmd =['addr2line']
+ cmd = ["addr2line"]
- cmd += ['-e', executable]
+ cmd += ["-e", executable]
# Print function names
- cmd += ['--functions']
+ cmd += ["--functions"]
# Expand inlined functions
- cmd += ['--addresses', addr]
+ cmd += ["--addresses", addr]
status = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
# - function name
# - source location
if len(addr2line_output) % 3 != 0:
- raise Exception('Unexpected addr2line output:\n\t{}'.format('\n\t'.join(addr2line_output)))
+ raise Exception(
+ "Unexpected addr2line output:\n\t{}".format("\n\t".join(addr2line_output))
+ )
function_names = []
for address_line_number in range(0, len(addr2line_output), 3):
return function_names
+
def extract_user_func_names(executable, raw_callstack):
"""
- Given a callstack from the Babeltrace CLI output, returns a set
- containing the name of the functions. This assumes that the binary have
- not changed since the execution.
+ Given a callstack from the Babeltrace CLI output, returns a set
+ containing the name of the functions. This assumes that the binary have
+ not changed since the execution.
"""
recorded_callstack = set()
# Remove commas and split on spaces
- for index, addr in enumerate(raw_callstack.replace(',', '').split(' ')):
+ for index, addr in enumerate(raw_callstack.replace(",", "").split(" ")):
# Consider only the elements starting with '0x' which are the
# addresses recorded in the callstack
- if '0x' in addr[:2]:
+ if "0x" in addr[:2]:
funcs = addr2line(executable, addr)
recorded_callstack.update(funcs)
return recorded_callstack
+
def extract_kernel_func_names(raw_callstack):
"""
- Given a callstack from the Babeltrace CLI output, returns a set
- containing the name of the functions.
- Uses the /proc/kallsyms procfile to find the symbol associated with an
- address. This function should only be used if the user is root or has
- access to /proc/kallsyms.
+ Given a callstack from the Babeltrace CLI output, returns a set
+ containing the name of the functions.
+ Uses the /proc/kallsyms procfile to find the symbol associated with an
+ address. This function should only be used if the user is root or has
+ access to /proc/kallsyms.
"""
recorded_callstack = set()
- syms=[]
- addresses=[]
+ syms = []
+ addresses = []
# We read kallsyms file and save the output
- with open('/proc/kallsyms') as kallsyms_f:
+ with open("/proc/kallsyms") as kallsyms_f:
for line in kallsyms_f:
line_tokens = line.split()
addr = line_tokens[0]
symbol = line_tokens[2]
addresses.append(int(addr, 16))
- syms.append({'addr':int(addr, 16), 'symbol':symbol})
+ syms.append({"addr": int(addr, 16), "symbol": symbol})
# Save the address and symbol in a sorted list of tupple
- syms = sorted(syms, key=lambda k:k['addr'])
+ syms = sorted(syms, key=lambda k: k["addr"])
# We save the list of addresses in a seperate sorted list to easily bisect
# the closer address of a symbol.
addresses = sorted(addresses)
# Remove commas and split on spaces
- for addr in raw_callstack.replace(',', '').split(' '):
- if '0x' in addr[:2]:
+ for addr in raw_callstack.replace(",", "").split(" "):
+ if "0x" in addr[:2]:
# Search the location of the address in the addresses list and
# deference this location in the syms list to add the associated
# symbol.
loc = bisect.bisect(addresses, int(addr, 16))
- recorded_callstack.add(syms[loc-1]['symbol'])
+ recorded_callstack.add(syms[loc - 1]["symbol"])
return recorded_callstack
+
# Regex capturing the callstack_user and callstack_kernel context
-user_cs_rexp='.*callstack_user\ \=\ \[(.*)\]\ .*\}, \{.*\}'
-kernel_cs_rexp='.*callstack_kernel\ \=\ \[(.*)\]\ .*\}, \{.*\}'
+user_cs_rexp = ".*callstack_user\ \=\ \[(.*)\]\ .*\}, \{.*\}"
+kernel_cs_rexp = ".*callstack_kernel\ \=\ \[(.*)\]\ .*\}, \{.*\}"
+
def main():
"""
- Reads a line from stdin and expect it to be a wellformed Babeltrace CLI
- output containing containing a callstack context of the domain passed
- as argument.
+ Reads a line from stdin and expect it to be a wellformed Babeltrace CLI
+ output containing containing a callstack context of the domain passed
+ as argument.
"""
expected_callstack = set()
recorded_callstack = set()
- cs_type=None
+ cs_type = None
if len(sys.argv) <= 2:
print(sys.argv)
- raise ValueError('USAGE: ./{} (--kernel|--user EXE) FUNC-NAMES'.format(sys.argv[0]))
+ raise ValueError(
+ "USAGE: ./{} (--kernel|--user EXE) FUNC-NAMES".format(sys.argv[0])
+ )
# If the `--user` option is passed, save the next argument as the path
# to the executable
- argc=1
- executable=None
- if sys.argv[argc] in '--kernel':
+ argc = 1
+ executable = None
+ if sys.argv[argc] in "--kernel":
rexp = kernel_cs_rexp
- cs_type='kernel'
- elif sys.argv[argc] in '--user':
+ cs_type = "kernel"
+ elif sys.argv[argc] in "--user":
rexp = user_cs_rexp
- cs_type='user'
- argc+=1
+ cs_type = "user"
+ argc += 1
executable = sys.argv[argc]
else:
- raise Exception('Unknown domain')
+ raise Exception("Unknown domain")
- argc+=1
+ argc += 1
# Extract the function names that are expected to be found call stack of
# the current events
# If there is no match, exit with error
if m is None:
- raise re.error('Callstack not found in event line')
+ raise re.error("Callstack not found in event line")
else:
raw_callstack = str(m.group(1))
- if cs_type in 'user':
- recorded_callstack=extract_user_func_names(executable, raw_callstack)
- elif cs_type in 'kernel':
- recorded_callstack=extract_kernel_func_names(raw_callstack)
+ if cs_type in "user":
+ recorded_callstack = extract_user_func_names(executable, raw_callstack)
+ elif cs_type in "kernel":
+ recorded_callstack = extract_kernel_func_names(raw_callstack)
else:
- raise Exception('Unknown domain')
+ raise Exception("Unknown domain")
# Verify that all expected function are present in the callstack
for e in expected_callstack:
if e not in recorded_callstack:
- raise Exception('Expected function name not found in recorded callstack')
+ raise Exception("Expected function name not found in recorded callstack")
sys.exit(0)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
_last_time = _get_time_ns()
-BABELTRACE_BIN="babeltrace2"
+BABELTRACE_BIN = "babeltrace2"
+
class SessionInfo:
def __init__(self, handle, session_name, tmp_directory, channel_name):
self.trace_path = tmp_directory + "/" + session_name
self.channel_name = channel_name
-def bail(diag, session_info = None):
+
+def bail(diag, session_info=None):
print("Bail out!")
print("#", diag)
shutil.rmtree(session_info.tmp_directory)
exit(-1)
+
def print_automatic_test_timing():
global _time_tests
global _last_time
print(" ---\n duration_ms: {:02f}\n ...".format(duration_ns / 1000000))
_last_time = _get_time_ns()
+
def print_test_result(result, number, description):
result_string = None
if result is True:
print(result_string)
print_automatic_test_timing()
+
def skip_test(number, description):
- print('ok {} # skip {}'.format(number, description))
+ print("ok {} # skip {}".format(number, description))
print_automatic_test_timing()
+
def enable_ust_tracepoint_event(session_info, event_name):
event = Event()
event.name = event_name
if res < 0:
bail("Failed to enable userspace event " + event_name, session_info)
+
def create_session():
dom = Domain()
dom.type = DOMAIN_UST
bail("Failed to enable channel " + channel.name, session_info)
return session_info
+
def start_session(session_info):
start(session_info.name)
-def stop_session(session_info, bailing = False):
+
+def stop_session(session_info, bailing=False):
# Workaround lttng-ctl outputing directly to stdout by spawning a subprocess.
lttng_binary_path = os.path.dirname(os.path.abspath(__file__)) + "/"
for i in range(3):
lttng_binary_path = os.path.dirname(lttng_binary_path)
lttng_binary_path = lttng_binary_path + "/src/bin/lttng/lttng"
- retcode = subprocess.call([lttng_binary_path, "stop", session_info.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ retcode = subprocess.call(
+ [lttng_binary_path, "stop", session_info.name],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
if retcode != 0 and not bailing:
bail("Unable to stop session " + session_info.name, session_info)
destroy(session_info.name)
try:
import lttngust
-except (ImportError) as e:
- _perror('lttngust package not found: {}'.format(e))
+except ImportError as e:
+ _perror("lttngust package not found: {}".format(e))
def _main():
- ev1 = logging.getLogger('python-ev-test1');
- ev2 = logging.getLogger('python-ev-test2');
+ ev1 = logging.getLogger("python-ev-test1")
+ ev2 = logging.getLogger("python-ev-test2")
logging.basicConfig()
parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--nr-iter', required=True)
- parser.add_argument('-s', '--wait', required=True)
- parser.add_argument('-d', '--fire-debug-event', action="store_true")
- parser.add_argument('-e', '--fire-second-event', action="store_true")
- parser.add_argument('-r', '--ready-file')
- parser.add_argument('-g', '--go-file')
+ parser.add_argument("-n", "--nr-iter", required=True)
+ parser.add_argument("-s", "--wait", required=True)
+ parser.add_argument("-d", "--fire-debug-event", action="store_true")
+ parser.add_argument("-e", "--fire-second-event", action="store_true")
+ parser.add_argument("-r", "--ready-file")
+ parser.add_argument("-g", "--go-file")
args = parser.parse_args()
nr_iter = int(args.nr_iter)
go_file = args.go_file
if ready_file is not None and os.path.exists(ready_file):
- raise ValueError('Ready file already exist')
+ raise ValueError("Ready file already exist")
if go_file is not None and os.path.exists(go_file):
- raise ValueError('Go file already exist. Review synchronization')
+ raise ValueError("Go file already exist. Review synchronization")
if (ready_file is None) != (go_file is None):
- raise ValueError('--go-file and --ready-file need each others, review'
- 'synchronization')
-
+ raise ValueError(
+ "--go-file and --ready-file need each others, review" "synchronization"
+ )
# Inform that we are ready, if necessary
if ready_file is not None:
- open(ready_file, 'a').close()
+ open(ready_file, "a").close()
# Wait for go, if necessary
while go_file is not None and not os.path.exists(go_file):
time.sleep(0.5)
for i in range(nr_iter):
- ev1.info('{} fired [INFO]'.format(ev1.name))
+ ev1.info("{} fired [INFO]".format(ev1.name))
if fire_debug_ev:
- ev1.debug('{} fired [DEBUG]'.format(ev1.name))
+ ev1.debug("{} fired [DEBUG]".format(ev1.name))
time.sleep(wait_time)
if fire_second_ev:
- ev2.info('{} fired [INFO]'.format(ev2.name))
+ ev2.info("{} fired [INFO]".format(ev2.name))
if ready_file is not None:
try:
raise
-if __name__ == '__main__':
+if __name__ == "__main__":
_main()