Commit | Line | Data |
---|---|---|
ef945e4d JG |
1 | #!/usr/bin/env python3 |
2 | # | |
3 | # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
4 | # | |
5 | # SPDX-License-Identifier: GPL-2.0-only | |
6 | # | |
7 | ||
8 | import contextlib | |
2a69bf14 | 9 | import os |
ef945e4d | 10 | import sys |
2a69bf14 | 11 | import time |
ce8470c9 | 12 | from typing import Iterator, Optional |
ef945e4d JG |
13 | |
14 | ||
15 | class InvalidTestPlan(RuntimeError): | |
ce8470c9 MJ |
16 | def __init__(self, msg): |
17 | # type: (str) -> None | |
ef945e4d JG |
18 | super().__init__(msg) |
19 | ||
20 | ||
21 | class BailOut(RuntimeError): | |
ce8470c9 MJ |
22 | def __init__(self, msg): |
23 | # type: (str) -> None | |
ef945e4d JG |
24 | super().__init__(msg) |
25 | ||
26 | ||
27 | class TestCase: | |
ce8470c9 MJ |
28 | def __init__( |
29 | self, | |
30 | tap_generator, # type: "TapGenerator" | |
31 | description, # type: str | |
32 | ): | |
33 | self._tap_generator = tap_generator # type: "TapGenerator" | |
34 | self._result = None # type: Optional[bool] | |
35 | self._description = description # type: str | |
ef945e4d JG |
36 | |
37 | @property | |
ce8470c9 MJ |
38 | def result(self): |
39 | # type: () -> Optional[bool] | |
ef945e4d JG |
40 | return self._result |
41 | ||
42 | @property | |
ce8470c9 MJ |
43 | def description(self): |
44 | # type: () -> str | |
ef945e4d JG |
45 | return self._description |
46 | ||
ce8470c9 MJ |
47 | def _set_result(self, result): |
48 | # type: (bool) -> None | |
ef945e4d JG |
49 | if self._result is not None: |
50 | raise RuntimeError("Can't set test case result twice") | |
51 | ||
52 | self._result = result | |
53 | self._tap_generator.test(result, self._description) | |
54 | ||
ce8470c9 MJ |
55 | def success(self): |
56 | # type: () -> None | |
ef945e4d JG |
57 | self._set_result(True) |
58 | ||
ce8470c9 MJ |
59 | def fail(self): |
60 | # type: () -> None | |
ef945e4d JG |
61 | self._set_result(False) |
62 | ||
63 | ||
64 | # Produces a test execution report in the TAP format. | |
65 | class TapGenerator: | |
ce8470c9 MJ |
66 | def __init__(self, total_test_count): |
67 | # type: (int) -> None | |
ef945e4d JG |
68 | if total_test_count <= 0: |
69 | raise ValueError("Test count must be greater than zero") | |
70 | ||
ce8470c9 MJ |
71 | self._total_test_count = total_test_count # type: int |
72 | self._last_test_case_id = 0 # type: int | |
73 | self._printed_plan = False # type: bool | |
74 | self._has_failure = False # type: bool | |
2a69bf14 KS |
75 | self._time_tests = True # type: bool |
76 | if os.getenv("TAP_AUTOTIME", "1") == "" or os.getenv("TAP_AUTOTIME", "1") == "0": | |
77 | self._time_tests = False | |
78 | self._last_time = time.monotonic_ns() | |
ef945e4d JG |
79 | |
80 | def __del__(self): | |
81 | if self.remaining_test_cases > 0: | |
82 | self.bail_out( | |
83 | "Missing {remaining_test_cases} test cases".format( | |
84 | remaining_test_cases=self.remaining_test_cases | |
85 | ) | |
86 | ) | |
87 | ||
88 | @property | |
ce8470c9 MJ |
89 | def remaining_test_cases(self): |
90 | # type: () -> int | |
ef945e4d JG |
91 | return self._total_test_count - self._last_test_case_id |
92 | ||
ce8470c9 MJ |
93 | def _print(self, msg): |
94 | # type: (str) -> None | |
ef945e4d JG |
95 | if not self._printed_plan: |
96 | print( | |
97 | "1..{total_test_count}".format(total_test_count=self._total_test_count), | |
98 | flush=True, | |
99 | ) | |
100 | self._printed_plan = True | |
101 | ||
102 | print(msg, flush=True) | |
103 | ||
ce8470c9 MJ |
104 | def skip_all(self, reason): |
105 | # type: (str) -> None | |
ef945e4d JG |
106 | if self._last_test_case_id != 0: |
107 | raise RuntimeError("Can't skip all tests after running test cases") | |
108 | ||
109 | if reason: | |
110 | self._print("1..0 # Skip all: {reason}".format(reason=reason)) | |
111 | ||
112 | self._last_test_case_id = self._total_test_count | |
113 | ||
ce8470c9 MJ |
114 | def skip(self, reason, skip_count=1): |
115 | # type: (str, int) -> None | |
ef945e4d JG |
116 | for i in range(skip_count): |
117 | self._last_test_case_id = self._last_test_case_id + 1 | |
118 | self._print( | |
119 | "ok {test_number} # Skip: {reason}".format( | |
120 | reason=reason, test_number=(i + self._last_test_case_id) | |
121 | ) | |
122 | ) | |
123 | ||
ce8470c9 MJ |
124 | def bail_out(self, reason): |
125 | # type: (str) -> None | |
ef945e4d JG |
126 | self._print("Bail out! {reason}".format(reason=reason)) |
127 | self._last_test_case_id = self._total_test_count | |
128 | raise BailOut(reason) | |
129 | ||
ce8470c9 MJ |
130 | def test(self, result, description): |
131 | # type: (bool, str) -> None | |
2a69bf14 | 132 | duration = (time.monotonic_ns() - self._last_time) / 1_000_000 |
ef945e4d JG |
133 | if self._last_test_case_id == self._total_test_count: |
134 | raise InvalidTestPlan("Executing too many tests") | |
135 | ||
136 | if result is False: | |
137 | self._has_failure = True | |
138 | ||
139 | result_string = "ok" if result else "not ok" | |
140 | self._last_test_case_id = self._last_test_case_id + 1 | |
141 | self._print( | |
142 | "{result_string} {case_id} - {description}".format( | |
143 | result_string=result_string, | |
144 | case_id=self._last_test_case_id, | |
145 | description=description, | |
146 | ) | |
147 | ) | |
2a69bf14 KS |
148 | if self._time_tests: |
149 | self._print("---\n duration_ms: {}\n...\n".format(duration)) | |
150 | self._last_time = time.monotonic_ns() | |
ef945e4d | 151 | |
ce8470c9 MJ |
152 | def ok(self, description): |
153 | # type: (str) -> None | |
ef945e4d JG |
154 | self.test(True, description) |
155 | ||
ce8470c9 MJ |
156 | def fail(self, description): |
157 | # type: (str) -> None | |
ef945e4d JG |
158 | self.test(False, description) |
159 | ||
160 | @property | |
ce8470c9 MJ |
161 | def is_successful(self): |
162 | # type: () -> bool | |
ef945e4d JG |
163 | return ( |
164 | self._last_test_case_id == self._total_test_count and not self._has_failure | |
165 | ) | |
166 | ||
167 | @contextlib.contextmanager | |
ce8470c9 MJ |
168 | def case(self, description): |
169 | # type: (str) -> Iterator[TestCase] | |
ef945e4d JG |
170 | test_case = TestCase(self, description) |
171 | try: | |
172 | yield test_case | |
173 | except Exception as e: | |
174 | self.diagnostic( | |
175 | "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format( | |
176 | description=test_case.description, exception_type=type(e).__name__ | |
177 | ) | |
178 | ) | |
179 | ||
180 | if str(e) != "": | |
181 | self.diagnostic(str(e)) | |
182 | ||
183 | test_case.fail() | |
184 | finally: | |
185 | if test_case.result is None: | |
186 | test_case.success() | |
187 | ||
ce8470c9 MJ |
188 | def diagnostic(self, msg): |
189 | # type: (str) -> None | |
ef945e4d | 190 | print("# {msg}".format(msg=msg), file=sys.stderr, flush=True) |