File: throttler.py

package info (click to toggle)
python-asyncio-throttle 1.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 80 kB
  • sloc: python: 103; makefile: 4
file content (36 lines) | stat: -rw-r--r-- 937 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import asyncio
from collections import deque
from typing import Deque


class Throttler:
    def __init__(self, rate_limit: int, period=1.0, retry_interval=0.01):
        self.rate_limit = rate_limit
        self.period = period
        self.retry_interval = retry_interval

        self._task_logs: Deque[float] = deque()

    def flush(self):
        now = time.monotonic()
        while self._task_logs:
            if now - self._task_logs[0] > self.period:
                self._task_logs.popleft()
            else:
                break

    async def acquire(self):
        while True:
            self.flush()
            if len(self._task_logs) < self.rate_limit:
                break
            await asyncio.sleep(self.retry_interval)

        self._task_logs.append(time.monotonic())

    async def __aenter__(self):
        await self.acquire()

    async def __aexit__(self, exc_type, exc, tb):
        pass