1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
from functools import wraps
from typing import Callable
from throttler import ExecutionTimer, Throttler, ThrottlerSimultaneous, Timer
def throttle(rate_limit: int, period: float = 1.0):
def decorator(func):
throttler = Throttler(rate_limit, period)
@wraps(func)
async def wrapper(*args, **kwargs):
async with throttler:
return await func(*args, **kwargs)
return wrapper
return decorator
def throttle_simultaneous(count: int):
def decorator(func):
throttler = ThrottlerSimultaneous(count)
@wraps(func)
async def wrapper(*args, **kwargs):
async with throttler:
return await func(*args, **kwargs)
return wrapper
return decorator
def execution_timer(period: float = 60., align_sleep: bool = False):
def decorator(func):
et = ExecutionTimer(period, align_sleep)
@wraps(func)
def wrapper(*args, **kwargs):
with et:
return func(*args, **kwargs)
return wrapper
return decorator
def execution_timer_async(period: float = 60., align_sleep: bool = False):
def decorator(func):
et = ExecutionTimer(period, align_sleep)
@wraps(func)
async def wrapper(*args, **kwargs):
async with et:
return await func(*args, **kwargs)
return wrapper
return decorator
def timer(name: str = None, verbose: bool = False, print_func: Callable = None):
def decorator(func):
t = Timer(name, verbose, print_func)
@wraps(func)
def wrapper(*args, **kwargs):
with t:
return func(*args, **kwargs)
return wrapper
return decorator
def timer_async(name: str = None, verbose: bool = False, print_func: Callable = None):
def decorator(func):
t = Timer(name, verbose, print_func)
@wraps(func)
async def wrapper(*args, **kwargs):
with t:
return await func(*args, **kwargs)
return wrapper
return decorator
|