import pprint
import sys
import time
+import json
from collections import defaultdict
-NSEC_PER_SEC = 1000000000
-
try:
- from babeltrace import TraceCollection
+ import bt2
except ImportError:
# quick fix for debian-based distros
sys.path.append("/usr/local/lib/python%d.%d/site-packages" %
(sys.version_info.major, sys.version_info.minor))
- from babeltrace import TraceCollection
+ import bt2
+NSEC_PER_SEC = 1000000000
class TraceParser:
- def __init__(self, trace, pid):
- self.trace = trace
+ def __init__(self, trace_msg_iter, pid):
+ self.trace = trace_msg_iter
self.pid = pid
# This dictionnary holds the results of each testcases of a test.
def parse(self):
# iterate over all the events
- for event in self.trace.events:
- if self.pid is not None and event["pid"] != self.pid:
+ for msg in self.trace:
+ if type(msg) is not bt2._EventMessageConst:
+ continue
+
+ if self.pid is not None and msg.event["pid"] != self.pid:
continue
- method_name = "handle_%s" % event.name.replace(":", "_").replace(
+ method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
"+", "_")
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
- func(self, event)
+ func(self, msg.event)
ret = 0
# For each event of the test case, check all entries for failed
self.epoll_wait_exit(event)
def handle_compat_syscall_entry_epoll_pwait(self, event):
- self.epoll_wait_entry(event)
+ self.epoll_pwait_entry(event)
def handle_compat_syscall_exit_epoll_pwait(self, event):
- self.epoll_wait_exit(event)
+ self.epoll_pwait_exit(event)
def handle_syscall_entry_epoll_pwait(self, event):
- self.epoll_wait_entry(event)
+ self.epoll_pwait_entry(event)
def handle_syscall_exit_epoll_pwait(self, event):
- self.epoll_wait_exit(event)
+ self.epoll_pwait_exit(event)
def epoll_wait_entry(self, event):
pass
def epoll_wait_exit(self, event):
pass
+ def epoll_pwait_entry(self, event):
+ self.epoll_wait_entry(event)
+
+ def epoll_pwait_exit(self, event):
+ self.epoll_wait_exit(event)
+
## poll + ppoll
def handle_compat_syscall_entry_poll(self, event):
self.poll_entry(event)
pass
-class Test1(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class WorkingCases(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
+
+ # Values expected in the trace
+ self.epoll_wait_fd = validation_args['epoll_wait_fd']
+ self.epoll_pwait_fd = validation_args['epoll_pwait_fd']
+
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
self.expect["select_exit"]["select_out_fd0"] = 0
self.expect["epoll_ctl_exit"]["epoll_ctl_out_ok"] = 0
self.expect["epoll_wait_entry"]["epoll_wait_in_ok"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 0
+ self.expect["epoll_pwait_entry"]["epoll_pwait_in_ok"] = 0
+ self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 0
def select_entry(self, event):
n = event["n"]
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
- if epfd == 3 and op_enum == "EPOLL_CTL_ADD" and fd == 0 and \
+ if (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd) and 'EPOLL_CTL_ADD' in op_enum.labels and fd == 0 and \
_event["data_union"]["fd"] == 0 and \
_event["events"]["EPOLLIN"] == 1 and \
_event["events"]["EPOLLPRI"] == 1:
maxevents = event["maxevents"]
timeout = event["timeout"]
- if epfd == 3 and maxevents == 1 and timeout == -1:
+ if epfd == self.epoll_wait_fd and maxevents == 1 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_in_ok"] = 1
# Save values of local variables to print in case of test failure
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
+ def epoll_pwait_entry(self, event):
+ epfd = event["epfd"]
+ maxevents = event["maxevents"]
+ timeout = event["timeout"]
+
+ if epfd == self.epoll_pwait_fd and maxevents == 1 and timeout == -1:
+ self.expect["epoll_pwait_entry"]["epoll_pwait_in_ok"] = 1
+
+ # Save values of local variables to print in case of test failure
+ self.recorded_values["epoll_pwait_entry"] = locals()
+
+ def epoll_pwait_exit(self, event):
+ ret = event["ret"]
+ fds_length = event["fds_length"]
+ overflow = event["overflow"]
+
+ # check that FD 0 returned with EPOLLIN and the right data.fd
+ if ret == 1 and fds_length == 1:
+ fd_0 = event["fds"][0]
+ if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
+ fd_0["events"]["EPOLLIN"] == 1:
+ self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
+
+ # Save values of local variables to print in case of test failure
+ self.recorded_values["epoll_pwait_exit"] = locals()
-class Test2(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class WorkingCasesTimeout(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
- if op_enum == "EPOLL_CTL_ADD" and \
+ if 'EPOLL_CTL_ADD' in op_enum.labels and \
_event["events"]["EPOLLIN"] == 1 and \
_event["events"]["EPOLLPRI"] == 1:
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
self.recorded_values["epoll_wait_exit"] = locals()
-class Test3(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PselectInvalidFd(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test4(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PpollBig(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
-class Test5(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PpollFdsBufferOverflow(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
self.recorded_values["poll_exit"] = locals()
-class Test6(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PselectInvalidPointer(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test7(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PpollFdsULongMax(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
self.recorded_values["poll_exit"] = locals()
-class Test8(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class EpollPwaitInvalidPointer(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
+
+ # Values expected in the trace
+ self.epoll_fd = validation_args['epollfd']
+
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
# test that event in valid even though the target buffer pointer is
# invalid and the program segfaults
- if epfd == 3 and maxevents == 1 and timeout == -1:
+ if epfd == self.epoll_fd and maxevents == 1 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
-class Test9(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class EpollPwaitIntMax(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args['pid'])
+
+ # Values expected in the trace
+ self.epoll_fd = validation_args['epollfd']
+
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
timeout = event["timeout"]
# check the proper working of INT_MAX maxevent value
- if epfd == 3 and maxevents == 2147483647 and timeout == -1:
+ if epfd == self.epoll_fd and maxevents == 2147483647 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 1
# Save values of local variables to print in case of test failure
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Trace parser')
parser.add_argument('path', metavar="<path/to/trace>", help='Trace path')
- parser.add_argument('-t', '--test', type=int, help='Test to validate')
- parser.add_argument('-p', '--pid', type=int, help='PID of the app')
+ parser.add_argument('-t', '--test', type=str, help='Test to validate')
+ parser.add_argument('-o', '--validation-file', type=str, help='Validation file path')
args = parser.parse_args()
if not args.test:
- print("Need to pass a test to validate (-t)")
+ print("Need to pass a test to validate (--test/-t)")
sys.exit(1)
- if not args.pid:
- print("Need to pass the PID to check (-p)")
+ if not args.validation_file:
+ print("Need to pass the test validation file (--validation-file/-o)")
sys.exit(1)
- traces = TraceCollection()
- handle = traces.add_traces_recursive(args.path, "ctf")
- if handle is None:
- sys.exit(1)
+ traces = bt2.TraceCollectionMessageIterator(args.path)
+
+ with open(args.validation_file) as f:
+ try:
+ test_validation_args = json.load(f)
+ except Exception as e:
+ print('Failed to parse validation file: ' + str(e))
+ sys.exit(1)
t = None
- if args.test == 1:
- t = Test1(traces, args.pid)
- elif args.test == 2:
- t = Test2(traces, args.pid)
- elif args.test == 3:
- t = Test3(traces, args.pid)
- elif args.test == 4:
- t = Test4(traces, args.pid)
- elif args.test == 5:
- t = Test5(traces, args.pid)
- elif args.test == 6:
- t = Test6(traces, args.pid)
- elif args.test == 7:
- t = Test7(traces, args.pid)
- elif args.test == 8:
- t = Test8(traces, args.pid)
- elif args.test == 9:
- t = Test9(traces, args.pid)
- elif args.test == 10:
+ if args.test == "working_cases":
+ t = WorkingCases(traces, test_validation_args)
+ elif args.test == "working_cases_timeout":
+ t = WorkingCasesTimeout(traces, test_validation_args)
+ elif args.test == "pselect_invalid_fd":
+ t = PselectInvalidFd(traces, test_validation_args)
+ elif args.test == "ppoll_big":
+ t = PpollBig(traces, test_validation_args)
+ elif args.test == "ppoll_fds_buffer_overflow":
+ t = PpollFdsBufferOverflow(traces, test_validation_args)
+ elif args.test == "pselect_invalid_pointer":
+ t = PselectInvalidPointer(traces, test_validation_args)
+ elif args.test == "ppoll_fds_ulong_max":
+ t = PpollFdsULongMax(traces, test_validation_args)
+ elif args.test == "epoll_pwait_invalid_pointer":
+ t = EpollPwaitInvalidPointer(traces, test_validation_args)
+ elif args.test == "epoll_pwait_int_max":
+ t = EpollPwaitIntMax(traces, test_validation_args)
+ elif args.test == "ppoll_concurrent_write":
# stress test, nothing reliable to check
ret = 0
- elif args.test == 11:
+ elif args.test == "epoll_pwait_concurrent_munmap":
# stress test, nothing reliable to check
ret = 0
else:
if t is not None:
ret = t.parse()
- for h in handle.values():
- traces.remove_trace(h)
-
sys.exit(ret)