File: throttler.py

package info (click to toggle)
python-afsapi 0.2.7-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 148 kB
  • sloc: python: 714; makefile: 4
file content (50 lines) | stat: -rw-r--r-- 1,825 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
from contextlib import AbstractAsyncContextManager
import time
from typing import Optional, Type, Any
from types import TracebackType


class Throttler:
    """Ensures that a time between executions is taken into account for each wrapped code block,
    which can be configured for every entry."""

    def __init__(self) -> None:
        self._lock = asyncio.Lock()
        self._next_execution_not_before: Optional[float] = None

    class _ThrottleContextManager(AbstractAsyncContextManager[Any]):
        """Ensures that a time between executions is taken into account for each wrapped code block."""

        def __init__(
            self, throttler: "Throttler", time_after_execution_s: float
        ) -> None:
            self.throttler = throttler
            self.time_after_execution_s = time_after_execution_s

        async def __aenter__(self) -> None:
            await self.throttler._lock.acquire()
            if self.throttler._next_execution_not_before is not None:
                additional_wait = (
                    self.throttler._next_execution_not_before - time.monotonic()
                )

                if additional_wait > 0:
                    await asyncio.sleep(additional_wait)

            return None

        async def __aexit__(
            self,
            exc_type: Optional[Type[BaseException]],
            exc: Optional[BaseException],
            tb: Optional[TracebackType],
        ) -> None:
            self.throttler._next_execution_not_before = (
                time.monotonic() + self.time_after_execution_s
            )
            self.throttler._lock.release()
            return None

    def throttle(self, throttle_after_call_s: float) -> _ThrottleContextManager:
        return Throttler._ThrottleContextManager(self, throttle_after_call_s)