1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
"""
Demonstrates using a SQLite Bucket across multiple processes, using a filelock to enforce synchronization.
This is useful in cases where multiple processes are created, possibly at different times or from different
applications.
The SQLite Bucket uses a .lock file to ensure that only one process is active at a time.
"""
import logging
import os
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
from functools import partial
from pyrate_limiter import Duration
from pyrate_limiter import Limiter
from pyrate_limiter import limiter_factory
LIMITER: Limiter | None = None
REQUESTS_PER_SECOND = 10
NUM_REQUESTS = REQUESTS_PER_SECOND * 5 # Run for ~5 seconds
logger = logging.getLogger(__name__)
def init_process():
global LIMITER
LIMITER = limiter_factory.create_sqlite_limiter(rate_per_duration=REQUESTS_PER_SECOND,
duration=Duration.SECOND,
db_path="pyrate_limiter.sqlite",
use_file_lock=True)
def my_task():
assert LIMITER is not None
LIMITER.try_acquire("my_task")
result = {"time": time.monotonic(), "pid": os.getpid()}
return result
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
# prime the rates, to show realistic rates
init_process()
assert LIMITER is not None
[LIMITER.try_acquire("test") for _ in range(REQUESTS_PER_SECOND)]
start = time.monotonic()
with ProcessPoolExecutor(
initializer=partial(init_process)
) as executor:
futures = [executor.submit(my_task) for _ in range(NUM_REQUESTS)]
wait(futures)
times = []
for f in futures:
try:
t = f.result()
times.append(t)
except Exception as e:
print(f"Task raised: {e}")
end = time.monotonic()
print(f"Completed {NUM_REQUESTS=} in {end - start} seconds, at a rate of {REQUESTS_PER_SECOND=}")
|