1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
# tests/test_limiter_blocking_behavior.py
import asyncio
import time
import pytest
from pyrate_limiter import Rate
from pyrate_limiter.buckets import InMemoryBucket
from pyrate_limiter.limiter import Limiter
RATE = Rate(1, 200) # 1 token per 200ms
def make_limiter():
return Limiter(InMemoryBucket([RATE]), buffer_ms=0)
# --- sync decorator blocks ---
def test_sync_decorator_blocks():
lim = make_limiter()
@lim.as_decorator()
def work():
return time.perf_counter()
t0 = time.perf_counter()
work() # consumes the only slot
t1 = work() # must block ~200ms waiting for leak
elapsed = t1 - t0
assert elapsed >= 0.18
# --- async decorator blocks ---
@pytest.mark.asyncio
async def test_async_decorator_blocks():
lim = make_limiter()
@lim.as_decorator()
async def work():
return time.perf_counter()
t0 = time.perf_counter()
await work() # consumes the only slot
t1 = await work() # must block ~200ms
elapsed = t1 - t0
assert elapsed >= 0.18
# --- try_acquire: non-blocking fails on contention ---
def test_try_acquire_nonblocking_false():
lim = make_limiter()
assert lim.try_acquire("k", blocking=False) is True
assert lim.try_acquire("k", blocking=False) is False # immediate refusal
# --- try_acquire_async: non-blocking fails on contention ---
@pytest.mark.asyncio
async def test_try_acquire_async_nonblocking_false():
lim = make_limiter()
# ensure clean slate
for b in lim.buckets():
f = b.flush()
if asyncio.iscoroutine(f): await f
assert await lim.try_acquire_async("k_async_nb", blocking=False) is True
assert await lim.try_acquire_async("k_async_nb", blocking=False) is False
# --- try_acquire_async with timeout enforces max wait ---
@pytest.mark.asyncio
async def test_try_acquire_async_timeout():
lim = make_limiter()
assert await lim.try_acquire_async("k", blocking=True) is True # take the only slot
t0 = time.perf_counter()
ok = await lim.try_acquire_async("k", blocking=True, timeout=0.1)
t1 = time.perf_counter()
assert ok is False # timed out
assert 0.09 <= (t1 - t0) <= 0.25 # waited ~timeout, not full 200ms
# --- sync timeout is not implemented ---
def test_try_acquire_sync_timeout_not_implemented():
lim = make_limiter()
with pytest.raises(NotImplementedError):
lim.try_acquire("k", blocking=True, timeout=0.1)
|