1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
import sys
import time
from threading import Barrier, Thread
import pytest
from hypothesis import given, settings, strategies as st
from hypothesis.errors import DeadlineExceeded, InvalidArgument
from hypothesis.internal.conjecture.junkdrawer import ensure_free_stackframes
from hypothesis.stateful import RuleBasedStateMachine, invariant, rule
from hypothesis.strategies import SearchStrategy
from tests.common.debug import check_can_generate_examples
from tests.common.utils import run_concurrently
pytestmark = pytest.mark.skipif(
settings._current_profile == "crosshair", reason="crosshair is not thread safe"
)
def test_can_run_given_in_thread():
has_run_successfully = False
@given(st.integers())
def test(n):
nonlocal has_run_successfully
has_run_successfully = True
t = Thread(target=test)
t.start()
t.join()
assert has_run_successfully
def test_run_stateful_test_concurrently():
class MyStateMachine(RuleBasedStateMachine):
def __init__(self):
super().__init__()
@rule(n=st.integers())
def my_rule(self, n):
pass
@invariant()
def my_invariant(self):
pass
TestMyStateful = MyStateMachine.TestCase().runTest
run_concurrently(TestMyStateful, n=2)
def do_work(*, multiplier=1):
# arbitrary moderately-expensive work
for x in range(500 * multiplier):
_y = x**x
def test_run_different_tests_in_threads():
@given(st.integers())
def test1(n):
do_work()
@given(st.integers())
def test2(n):
do_work()
thread1 = Thread(target=test1)
thread2 = Thread(target=test2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def test_run_given_concurrently():
@given(st.data(), st.integers(-5, 5).map(lambda x: 10**x))
def test(data, magnitude):
assert magnitude != 0
data.draw(st.complex_numbers(max_magnitude=magnitude))
run_concurrently(test, n=2)
def test_stackframes_restores_original_recursion_limit():
original_recursionlimit = sys.getrecursionlimit()
def test():
with ensure_free_stackframes():
do_work()
# also mix in a hypothesis test; why not.
@given(st.integers())
@settings(max_examples=10)
def test(n):
pass
test()
threads = []
for _ in range(4):
threads.append(Thread(target=test))
for thread in threads:
thread.start()
for thread in threads:
thread.join(timeout=10)
assert sys.getrecursionlimit() == original_recursionlimit
@pytest.mark.parametrize(
"strategy",
[
st.recursive(st.none(), st.lists, max_leaves=-1),
st.recursive(st.none(), st.lists, max_leaves=0),
st.recursive(st.none(), st.lists, max_leaves=1.0),
],
)
def test_handles_invalid_args_cleanly(strategy):
# we previously had a race in SearchStrategy.validate, where one thread would
# set `validate_called = True` (which it has to do first for recursive
# strategies), then another thread would try to generate before the validation
# finished and errored, and would get into weird technically-valid states
# like interpreting 1.0 as 1. I saw FlakyStrategyDefinition here because the
# validating + errored thread drew zero choices, but the other thread drew
# 1 choice, for the same shared strategy.
def check():
with pytest.raises(InvalidArgument):
check_can_generate_examples(strategy)
run_concurrently(check, n=4)
def test_single_thread_can_raise_deadline_exceeded():
# a slow test running inside a thread, but not concurrently, should still
# be able to raise DeadlineExceeded.
@given(st.integers())
@settings(max_examples=5)
def slow_test(n):
do_work()
time.sleep(0.4)
def target():
with pytest.raises(DeadlineExceeded):
slow_test()
thread = Thread(target=target)
thread.start()
thread.join(timeout=10)
def test_deadline_exceeded_not_raised_under_concurrent_threads():
# it's still possible for multithreaded calls to a slow function to raise
# DeadlineExceeded, if the first thread completes its entire test before
# any other thread starts. For this test, prevent this scenario with a barrier,
# forcing the threads to run in parallel.
n_threads = 8
barrier = Barrier(n_threads)
@given(st.integers())
@settings(max_examples=5)
def slow_test(n):
do_work()
time.sleep(0.4)
barrier.wait()
run_concurrently(slow_test, n=n_threads)
def test_deadline_exceeded_can_be_raised_after_threads():
# if we had concurrent threads before, but they've finished now, we should
# still be able to raise DeadlineExceeded normally. Importantly, we test this
# for the same test as was running before, since concurrent thread use is
# tracked per-@given.
@given(st.integers())
@settings(max_examples=5)
def slow_test(n):
do_work()
if should_sleep:
time.sleep(0.4)
should_sleep = False
run_concurrently(slow_test, n=8)
should_sleep = True
with pytest.raises(DeadlineExceeded):
slow_test()
def test_one_of_branches_lock():
class SlowBranchesStrategy(SearchStrategy):
@property
def branches(self):
# multiplier=2 reproduces more consistently than multiplier=1 for me
do_work(multiplier=2)
return [st.integers(), st.text()]
branch_counts = set()
s = st.one_of(SlowBranchesStrategy(), SlowBranchesStrategy())
def test():
branches = len(s.branches)
branch_counts.add(branches)
run_concurrently(test, n=10)
assert len(branch_counts) == 1
# there are 4 independent strategies, but only 2 distinct ones -
# st.integers(), and st.text().
assert branch_counts == {2}
|