3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
12 from typing
import Iterator
, Optional
15 class InvalidTestPlan(RuntimeError):
16 def __init__(self
, msg
):
21 class BailOut(RuntimeError):
22 def __init__(self
, msg
):
30 tap_generator
, # type: "TapGenerator"
31 description
, # type: str
33 self
._tap
_generator
= tap_generator
# type: "TapGenerator"
34 self
._result
= None # type: Optional[bool]
35 self
._description
= description
# type: str
39 # type: () -> Optional[bool]
43 def description(self
):
45 return self
._description
47 def _set_result(self
, result
):
48 # type: (bool) -> None
49 if self
._result
is not None:
50 raise RuntimeError("Can't set test case result twice")
53 self
._tap
_generator
.test(result
, self
._description
)
57 self
._set
_result
(True)
61 self
._set
_result
(False)
64 # Produces a test execution report in the TAP format.
66 def __init__(self
, total_test_count
):
68 if total_test_count
<= 0:
69 raise ValueError("Test count must be greater than zero")
71 self
._total
_test
_count
= total_test_count
# type: int
72 self
._last
_test
_case
_id
= 0 # type: int
73 self
._printed
_plan
= False # type: bool
74 self
._has
_failure
= False # type: bool
75 self
._time
_tests
= True # type: bool
76 if os
.getenv("TAP_AUTOTIME", "1") == "" or os
.getenv("TAP_AUTOTIME", "1") == "0":
77 self
._time
_tests
= False
78 self
._last
_time
= time
.monotonic_ns()
81 if self
.remaining_test_cases
> 0:
83 "Missing {remaining_test_cases} test cases".format(
84 remaining_test_cases
=self
.remaining_test_cases
89 def remaining_test_cases(self
):
91 return self
._total
_test
_count
- self
._last
_test
_case
_id
93 def _print(self
, msg
):
95 if not self
._printed
_plan
:
97 "1..{total_test_count}".format(total_test_count
=self
._total
_test
_count
),
100 self
._printed
_plan
= True
102 print(msg
, flush
=True)
104 def skip_all(self
, reason
):
105 # type: (str) -> None
106 if self
._last
_test
_case
_id
!= 0:
107 raise RuntimeError("Can't skip all tests after running test cases")
110 self
._print
("1..0 # Skip all: {reason}".format(reason
=reason
))
112 self
._last
_test
_case
_id
= self
._total
_test
_count
114 def skip(self
, reason
, skip_count
=1):
115 # type: (str, int) -> None
116 for i
in range(skip_count
):
117 self
._last
_test
_case
_id
= self
._last
_test
_case
_id
+ 1
119 "ok {test_number} # Skip: {reason}".format(
120 reason
=reason
, test_number
=(i
+ self
._last
_test
_case
_id
)
124 def bail_out(self
, reason
):
125 # type: (str) -> None
126 self
._print
("Bail out! {reason}".format(reason
=reason
))
127 self
._last
_test
_case
_id
= self
._total
_test
_count
128 raise BailOut(reason
)
130 def test(self
, result
, description
):
131 # type: (bool, str) -> None
132 duration
= (time
.monotonic_ns() - self
._last
_time
) / 1_000_000
133 if self
._last
_test
_case
_id
== self
._total
_test
_count
:
134 raise InvalidTestPlan("Executing too many tests")
137 self
._has
_failure
= True
139 result_string
= "ok" if result
else "not ok"
140 self
._last
_test
_case
_id
= self
._last
_test
_case
_id
+ 1
142 "{result_string} {case_id} - {description}".format(
143 result_string
=result_string
,
144 case_id
=self
._last
_test
_case
_id
,
145 description
=description
,
149 self
._print
("---\n duration_ms: {}\n...\n".format(duration
))
150 self
._last
_time
= time
.monotonic_ns()
152 def ok(self
, description
):
153 # type: (str) -> None
154 self
.test(True, description
)
156 def fail(self
, description
):
157 # type: (str) -> None
158 self
.test(False, description
)
161 def is_successful(self
):
164 self
._last
_test
_case
_id
== self
._total
_test
_count
and not self
._has
_failure
167 @contextlib.contextmanager
168 def case(self
, description
):
169 # type: (str) -> Iterator[TestCase]
170 test_case
= TestCase(self
, description
)
173 except Exception as e
:
175 "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format(
176 description
=test_case
.description
, exception_type
=type(e
).__name
__
181 self
.diagnostic(str(e
))
185 if test_case
.result
is None:
188 def diagnostic(self
, msg
):
189 # type: (str) -> None
190 print("# {msg}".format(msg
=msg
), file=sys
.stderr
, flush
=True)
This page took 0.033583 seconds and 4 git commands to generate.