1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
import sys
import time
import asyncio
from collections import deque
import pytest
from asyncio_throttle import Throttler
class TestThrottle:
async def worker(self, throttler, logs):
try:
while True:
async with throttler:
logs.append(time.time())
await asyncio.sleep(0.05)
except asyncio.CancelledError:
pass
@pytest.mark.parametrize(
"rate_limit,workers_to_spawn", [(5, 5), (20, 35), (50, 100),]
)
@pytest.mark.asyncio
async def test_rate_limiting(self, rate_limit, workers_to_spawn):
throttler = Throttler(rate_limit)
logs = deque()
tasks = [
asyncio.ensure_future(self.worker(throttler, logs))
for _ in range(workers_to_spawn)
]
started = time.time()
while True:
now = time.time()
if now - started >= 5.0:
break
while logs:
if now - logs[0] > 1.0:
logs.popleft()
else:
break
assert len(logs) <= rate_limit
await asyncio.sleep(0.05)
for task in tasks:
task.cancel()
|