1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
import asyncio
import time
class ExecutionTimer:
"""
Context manager for time limiting of accessing to context block.
Simply sleep `period` secs before next accessing, not analog of Throttler.
Also it can align to start of minutes.
Example usage:
- https://github.com/uburuntu/throttler/blob/master/examples/example_execution_timer.py
"""
__slots__ = ('_period', '_align_sleep', '_start_time', '_next_time',)
def __init__(self, period: float = 60., align_sleep: bool = False):
self._period = period
self._align_sleep = align_sleep
self._start_time = 0.
self._next_time = 0.
def _start(self):
curr_time = time.time()
diff = self._next_time - curr_time
return diff
def _exit(self):
next_time = self._start_time + self._period
if self._align_sleep:
next_time -= self._start_time % self._period
self._next_time = next_time
def __enter__(self):
diff = self._start()
if diff > 0.:
time.sleep(diff)
self._start_time = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self._exit()
async def __aenter__(self):
diff = self._start()
if diff > 0.:
await asyncio.sleep(diff)
self._start_time = time.time()
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._exit()
|