3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
12 from typing
import Iterator
, Optional
18 # time.monotonic is only available since Python 3.3. We don't support
19 # those older versions so we can simply assert here.
20 assert sys
.version_info
>= (3, 3, 0)
22 # time.monotonic_ns is only available for python >= 3.8,
23 # so the value is multiplied by 10^9 to maintain compatibility with
24 # older versions of the interpreter.
25 return int(time
.monotonic() * 1000000000)
28 class InvalidTestPlan(RuntimeError):
29 def __init__(self
, msg
):
34 class BailOut(RuntimeError):
35 def __init__(self
, msg
):
43 tap_generator
, # type: "TapGenerator"
44 description
, # type: str
46 self
._tap
_generator
= tap_generator
# type: "TapGenerator"
47 self
._result
= None # type: Optional[bool]
48 self
._description
= description
# type: str
52 # type: () -> Optional[bool]
56 def description(self
):
58 return self
._description
60 def _set_result(self
, result
):
61 # type: (bool) -> None
62 if self
._result
is not None:
63 raise RuntimeError("Can't set test case result twice")
66 self
._tap
_generator
.test(result
, self
._description
)
70 self
._set
_result
(True)
74 self
._set
_result
(False)
77 # Produces a test execution report in the TAP format.
79 def __init__(self
, total_test_count
):
81 if total_test_count
<= 0:
82 raise ValueError("Test count must be greater than zero")
84 self
._total
_test
_count
= total_test_count
# type: int
85 self
._last
_test
_case
_id
= 0 # type: int
86 self
._printed
_plan
= False # type: bool
87 self
._has
_failure
= False # type: bool
88 self
._time
_tests
= True # type: bool
89 if os
.getenv("LTTNG_TESTS_TAP_AUTOTIME", "1") == "0":
90 self
._time
_tests
= False
91 self
._last
_time
= _get_time_ns()
94 if self
.remaining_test_cases
> 0:
96 "Missing {remaining_test_cases} test cases".format(
97 remaining_test_cases
=self
.remaining_test_cases
102 def remaining_test_cases(self
):
104 return self
._total
_test
_count
- self
._last
_test
_case
_id
106 def _print(self
, msg
):
107 # type: (str) -> None
108 if not self
._printed
_plan
:
110 "1..{total_test_count}".format(total_test_count
=self
._total
_test
_count
),
113 self
._printed
_plan
= True
115 print(msg
, flush
=True)
117 def skip_all(self
, reason
):
118 # type: (str) -> None
119 if self
._last
_test
_case
_id
!= 0:
120 raise RuntimeError("Can't skip all tests after running test cases")
123 self
._print
("1..0 # Skip all: {reason}".format(reason
=reason
))
125 self
._last
_test
_case
_id
= self
._total
_test
_count
127 def skip(self
, reason
, skip_count
=1):
128 # type: (str, int) -> None
129 for i
in range(skip_count
):
130 self
._last
_test
_case
_id
= self
._last
_test
_case
_id
+ 1
132 "ok {test_number} # Skip: {reason}".format(
133 reason
=reason
, test_number
=(self
._last
_test
_case
_id
)
137 def bail_out(self
, reason
):
138 # type: (str) -> None
139 self
._print
("Bail out! {reason}".format(reason
=reason
))
140 self
._last
_test
_case
_id
= self
._total
_test
_count
141 raise BailOut(reason
)
143 def test(self
, result
, description
):
144 # type: (bool, str) -> None
145 duration
= (_get_time_ns() - self
._last
_time
) / 1000000
146 if self
._last
_test
_case
_id
== self
._total
_test
_count
:
147 raise InvalidTestPlan("Executing too many tests")
150 self
._has
_failure
= True
152 result_string
= "ok" if result
else "not ok"
153 self
._last
_test
_case
_id
= self
._last
_test
_case
_id
+ 1
155 "{result_string} {case_id} - {description}".format(
156 result_string
=result_string
,
157 case_id
=self
._last
_test
_case
_id
,
158 description
=description
,
162 self
._print
("---\n duration_ms: {}\n...\n".format(duration
))
163 self
._last
_time
= _get_time_ns()
165 def ok(self
, description
):
166 # type: (str) -> None
167 self
.test(True, description
)
169 def fail(self
, description
):
170 # type: (str) -> None
171 self
.test(False, description
)
174 def is_successful(self
):
177 self
._last
_test
_case
_id
== self
._total
_test
_count
and not self
._has
_failure
180 @contextlib.contextmanager
181 def case(self
, description
):
182 # type: (str) -> Iterator[TestCase]
183 test_case
= TestCase(self
, description
)
186 except Exception as e
:
188 "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format(
189 description
=test_case
.description
, exception_type
=type(e
).__name
__
194 self
.diagnostic(str(e
))
198 if test_case
.result
is None:
201 def diagnostic(self
, msg
):
202 # type: (str) -> None
203 print("# {msg}".format(msg
=msg
), file=sys
.stderr
, flush
=True)
This page took 0.038662 seconds and 4 git commands to generate.