1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
# ruff: noqa: T201
"""
Demonstrates using a MultiprocessBucket using a ProcessPoolExecutor, running a simple task.
A MultiprocessBucket is useful when the rate is to be shared among a multiprocessing pool or ProcessPoolExecutor.
The mp_bucket stores its items in a multiprocessing ListProxy, and a multiprocessing lock is shared
across Limiter instances.
"""
import logging
import os
import time
from concurrent.futures import ProcessPoolExecutor, wait
from typing import Optional
from pyrate_limiter import Duration, Limiter, MultiprocessBucket, Rate
LIMITER: Optional[Limiter] = None
REQUESTS_PER_SECOND = 100
NUM_REQUESTS = REQUESTS_PER_SECOND * 5 # Run for ~5 seconds
logger = logging.getLogger(__name__)
def init_process(bucket: MultiprocessBucket):
global LIMITER
LIMITER = Limiter(bucket)
def my_task():
assert LIMITER is not None
LIMITER.try_acquire("my_task")
result = {"time": time.monotonic(), "pid": os.getpid()}
return result
def test_in_memory_multiprocess():
rate = Rate(REQUESTS_PER_SECOND, Duration.SECOND)
bucket = MultiprocessBucket.init([rate])
# create a limiter and feed it 100 requests to prime it
# Otherwise, the test appears to run too fast
init_process(bucket)
assert LIMITER is not None
[LIMITER.try_acquire("test") for _ in range(REQUESTS_PER_SECOND)]
start = time.monotonic()
with ProcessPoolExecutor(initializer=init_process, initargs=(bucket,)) as executor:
futures = [executor.submit(my_task) for _ in range(NUM_REQUESTS)]
wait(futures)
times = []
for f in futures:
try:
t = f.result()
times.append(t)
except Exception as e:
print(f"Task raised: {e}")
end = time.monotonic()
print(f"Completed {NUM_REQUESTS=} in {end - start} seconds, at a rate of {REQUESTS_PER_SECOND=}")
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
test_in_memory_multiprocess()
|