1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
# ruff: noqa: T201
"""
Demonstrates using a SQLite Bucket across multiple processes, using a filelock to enforce synchronization.
This is useful in cases where multiple processes are created, possibly at different times or from different
applications.
The SQLite Bucket uses a .lock file to ensure that only one process is active at a time.
"""
import logging
import os
import time
from concurrent.futures import ProcessPoolExecutor, wait
from typing import Optional
from pyrate_limiter import Duration, Limiter, limiter_factory
LIMITER: Optional[Limiter] = None
REQUESTS_PER_SECOND = 10
NUM_REQUESTS = REQUESTS_PER_SECOND * 5 # Run for ~5 seconds
logger = logging.getLogger(__name__)
def init_process():
global LIMITER
LIMITER = limiter_factory.create_sqlite_limiter(
rate_per_duration=REQUESTS_PER_SECOND, duration=Duration.SECOND, db_path="pyrate_limiter.sqlite", use_file_lock=True
)
def my_task():
assert LIMITER is not None
LIMITER.try_acquire("my_task")
result = {"time": time.monotonic(), "pid": os.getpid()}
return result
def test_sqlite_filelock_multiprocess():
# prime the rates, to show realistic rates
init_process()
assert LIMITER is not None
[LIMITER.try_acquire("test") for _ in range(REQUESTS_PER_SECOND)]
start = time.monotonic()
with ProcessPoolExecutor(initializer=init_process) as executor:
futures = [executor.submit(my_task) for _ in range(NUM_REQUESTS)]
wait(futures)
times = []
for f in futures:
try:
t = f.result()
times.append(t)
except Exception as e:
print(f"Task raised: {e}")
end = time.monotonic()
print(f"Completed {NUM_REQUESTS=} in {end - start} seconds, at a rate of {REQUESTS_PER_SECOND=}")
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
test_sqlite_filelock_multiprocess()
|