1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
|
from __future__ import annotations
import copy
import functools
import itertools
import os
import platform
import random
import time
from typing import Literal, TypeVar
from unittest import mock
import pytest
from jaraco.classes import properties
from jaraco.functools import Throttler, method_cache, retry, retry_call
_T = TypeVar("_T")
class TestThrottler:
@pytest.mark.xfail(
'GITHUB_ACTIONS' in os.environ and platform.system() in ('Darwin', 'Windows'),
reason="Performance is heavily throttled on Github Actions Mac/Windows runs",
)
def test_function_throttled(self) -> None:
"""
Ensure the throttler actually throttles calls.
"""
# set up a function to be called
counter = itertools.count()
# set up a version of `next` that is only called 30 times per second
limited_next = Throttler(next, 30)
# for one second, call next as fast as possible
deadline = time.time() + 1
while time.time() < deadline:
limited_next(counter)
# ensure the counter was advanced about 30 times
assert 28 <= next(counter) <= 32
# ensure that another burst of calls after some idle period will also
# get throttled
time.sleep(1)
deadline = time.time() + 1
counter = itertools.count()
while time.time() < deadline:
limited_next(counter)
assert 28 <= next(counter) <= 32
def test_reconstruct_unwraps(self) -> None:
"""
The throttler should be re-usable - if one wants to throttle a
function that's aready throttled, the original function should be
used.
"""
wrapped = Throttler(next, 30)
wrapped_again = Throttler(wrapped, 60)
assert wrapped_again.func is next
assert wrapped_again.max_rate == 60
def test_throttled_method(self) -> None:
class ThrottledMethodClass:
@Throttler
def echo(self, arg: _T) -> _T:
return arg
tmc = ThrottledMethodClass()
assert tmc.echo('foo') == 'foo'
class TestMethodCache:
bad_vers = '(3, 5, 0) <= sys.version_info < (3, 5, 2)'
@pytest.mark.skipif(bad_vers, reason="https://bugs.python.org/issue25447")
def test_deepcopy(self) -> None:
"""
A deepcopy of an object with a method cache should still
succeed.
"""
class ClassUnderTest:
calls = 0
@method_cache
def method(self, value: _T) -> _T:
self.calls += 1
return value
ob = ClassUnderTest()
copy.deepcopy(ob)
ob.method(1)
copy.deepcopy(ob)
def test_special_methods(self) -> None:
"""
Test method_cache with __getitem__ and __getattr__.
"""
class ClassUnderTest:
getitem_calls = 0
getattr_calls = 0
@method_cache
def __getitem__(self, item: _T) -> _T:
self.getitem_calls += 1
return item
@method_cache
def __getattr__(self, name: _T) -> _T:
self.getattr_calls += 1
return name
ob = ClassUnderTest()
# __getitem__
ob[1] + ob[1]
assert ob.getitem_calls == 1
# __getattr__
ob.one + ob.one # type: ignore[operator] # Using ParamSpec on methods is still limited
assert ob.getattr_calls == 1
@pytest.mark.xfail(reason="can't replace property with cache; #6")
def test_property(self) -> None:
"""
Can a method_cache decorated method also be a property?
"""
class ClassUnderTest:
@property
@method_cache
def mything(self) -> float: # pragma: nocover
return random.random()
ob = ClassUnderTest()
assert ob.mything == ob.mything
@pytest.mark.xfail(reason="can't replace property with cache; #6")
def test_non_data_property(self) -> None:
"""
A non-data property also does not work because the property
gets replaced with a method.
"""
class ClassUnderTest:
@properties.NonDataProperty
@method_cache
def mything(self) -> float:
return random.random()
ob = ClassUnderTest()
assert ob.mything == ob.mything
class TestRetry:
def attempt(self, arg: mock.Mock | None = None) -> Literal['Success']:
if next(self.fails_left):
raise ValueError("Failed!")
if arg:
arg.touch()
return "Success"
def set_to_fail(self, times: int) -> None:
self.fails_left = itertools.count(times, -1)
def test_set_to_fail(self) -> None:
"""
Test this test's internal failure mechanism.
"""
self.set_to_fail(times=2)
with pytest.raises(ValueError):
self.attempt()
with pytest.raises(ValueError):
self.attempt()
assert self.attempt() == 'Success'
def test_retry_call_succeeds(self) -> None:
self.set_to_fail(times=2)
res = retry_call(self.attempt, retries=2, trap=ValueError)
assert res == "Success"
def test_retry_call_fails(self) -> None:
"""
Failing more than the number of retries should
raise the underlying error.
"""
self.set_to_fail(times=3)
with pytest.raises(ValueError) as res:
retry_call(self.attempt, retries=2, trap=ValueError)
assert str(res.value) == 'Failed!'
def test_retry_multiple_exceptions(self) -> None:
self.set_to_fail(times=2)
errors = ValueError, NameError
res = retry_call(self.attempt, retries=2, trap=errors)
assert res == "Success"
def test_retry_exception_superclass(self) -> None:
self.set_to_fail(times=2)
res = retry_call(self.attempt, retries=2, trap=Exception)
assert res == "Success"
def test_default_traps_nothing(self) -> None:
self.set_to_fail(times=1)
with pytest.raises(ValueError):
retry_call(self.attempt, retries=1)
def test_default_does_not_retry(self) -> None:
self.set_to_fail(times=1)
with pytest.raises(ValueError):
retry_call(self.attempt, trap=Exception)
def test_cleanup_called_on_exception(self) -> None:
calls = random.randint(1, 10)
cleanup = mock.Mock()
self.set_to_fail(times=calls)
retry_call(self.attempt, retries=calls, cleanup=cleanup, trap=Exception)
assert cleanup.call_count == calls
cleanup.assert_called_with()
def test_infinite_retries(self) -> None:
self.set_to_fail(times=999)
cleanup = mock.Mock()
retry_call(self.attempt, retries=float('inf'), cleanup=cleanup, trap=Exception)
assert cleanup.call_count == 999
def test_with_arg(self) -> None:
self.set_to_fail(times=0)
arg = mock.Mock()
bound = functools.partial(self.attempt, arg)
res = retry_call(bound)
assert res == 'Success'
assert arg.touch.called
def test_decorator(self) -> None:
self.set_to_fail(times=1)
attempt = retry(retries=1, trap=Exception)(self.attempt)
res = attempt()
assert res == "Success"
def test_decorator_with_arg(self) -> None:
self.set_to_fail(times=0)
attempt = retry()(self.attempt)
arg = mock.Mock()
res = attempt(arg)
assert res == 'Success'
assert arg.touch.called
|