1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
"""
Demonstrates using a MultiprocessBucket using a ProcessPoolExecutor, running a simple task.
A MultiprocessBucket is useful when the rate is to be shared among a multiprocessing pool or ProcessPoolExecutor.
The mp_bucket stores its items in a multiprocessing ListProxy, and a multiprocessing lock is shared
across Limiter instances.
"""
import logging
import os
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
from functools import partial
from multiprocessing import Lock
from pyrate_limiter import Duration
from pyrate_limiter import Limiter
from pyrate_limiter import MonotonicClock
from pyrate_limiter import MultiprocessBucket
from pyrate_limiter import Rate
LIMITER: Limiter | None = None
MAX_DELAY = Duration.DAY
REQUESTS_PER_SECOND = 100
NUM_REQUESTS = REQUESTS_PER_SECOND * 5 # Run for ~5 seconds
logger = logging.getLogger(__name__)
def init_process(bucket: MultiprocessBucket):
global LIMITER
LIMITER = Limiter(bucket, raise_when_fail=False, clock=MonotonicClock(),
max_delay=MAX_DELAY, retry_until_max_delay=True)
def my_task():
assert LIMITER is not None
LIMITER.try_acquire("my_task")
result = {"time": time.monotonic(), "pid": os.getpid()}
return result
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
rate = Rate(REQUESTS_PER_SECOND, Duration.SECOND)
bucket = MultiprocessBucket.init([rate])
mp_lock = Lock()
# create a limiter and feed it 100 requests to prime it
# Otherwise, the test appears to run too fast
init_process(bucket)
assert LIMITER is not None
[LIMITER.try_acquire("test") for _ in range(REQUESTS_PER_SECOND)]
start = time.monotonic()
with ProcessPoolExecutor(
initializer=partial(init_process, bucket)
) as executor:
futures = [executor.submit(my_task) for _ in range(NUM_REQUESTS)]
wait(futures)
times = []
for f in futures:
try:
t = f.result()
times.append(t)
except Exception as e:
print(f"Task raised: {e}")
end = time.monotonic()
print(f"Completed {NUM_REQUESTS=} in {end - start} seconds, at a rate of {REQUESTS_PER_SECOND=}")
|