1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
import time
import asyncio
from collections import deque
from typing import Deque
class Throttler:
def __init__(self, rate_limit: int, period=1.0, retry_interval=0.01):
self.rate_limit = rate_limit
self.period = period
self.retry_interval = retry_interval
self._task_logs: Deque[float] = deque()
def flush(self):
now = time.monotonic()
while self._task_logs:
if now - self._task_logs[0] > self.period:
self._task_logs.popleft()
else:
break
async def acquire(self):
while True:
self.flush()
if len(self._task_logs) < self.rate_limit:
break
await asyncio.sleep(self.retry_interval)
self._task_logs.append(time.monotonic())
async def __aenter__(self):
await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
pass
|