File: execution_timer.py

package info (click to toggle)
python-throttler 1.2.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 180 kB
  • sloc: python: 473; makefile: 4; sh: 2
file content (50 lines) | stat: -rw-r--r-- 1,425 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
import time


class ExecutionTimer:
    """
    Context manager for time limiting of accessing to context block.
    Simply sleep `period` secs before next accessing, not analog of Throttler.
    Also it can align to start of minutes.

    Example usage:
        - https://github.com/uburuntu/throttler/blob/master/examples/example_execution_timer.py
    """
    __slots__ = ('_period', '_align_sleep', '_start_time', '_next_time',)

    def __init__(self, period: float = 60., align_sleep: bool = False):
        self._period = period
        self._align_sleep = align_sleep

        self._start_time = 0.
        self._next_time = 0.

    def _start(self):
        curr_time = time.time()
        diff = self._next_time - curr_time
        return diff

    def _exit(self):
        next_time = self._start_time + self._period
        if self._align_sleep:
            next_time -= self._start_time % self._period
        self._next_time = next_time

    def __enter__(self):
        diff = self._start()
        if diff > 0.:
            time.sleep(diff)
        self._start_time = time.time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._exit()

    async def __aenter__(self):
        diff = self._start()
        if diff > 0.:
            await asyncio.sleep(diff)
        self._start_time = time.time()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._exit()