1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
# ruff: noqa: T201
"""
Example of using pyrate_limiter with httpx.
"""
import logging
from pyrate_limiter import limiter_factory
from pyrate_limiter.extras.httpx_limiter import AsyncRateLimiterTransport, RateLimiterTransport
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger.setLevel(logging.DEBUG)
# Example below
def fetch(start_time: int):
import httpx
url = "https://httpbin.org/get"
assert limiter_factory.LIMITER is not None
with httpx.Client(transport=RateLimiterTransport(limiter=limiter_factory.LIMITER)) as client:
client.get(url)
def singleprocess_example():
import os
import time
import httpx
from pyrate_limiter import Duration, limiter_factory
start_time = time.time()
url = "https://httpbin.org/get"
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
transport = RateLimiterTransport(limiter=limiter)
with httpx.Client(transport=transport) as client:
for _ in range(10):
response = client.get(url)
print(f"{round(time.time() - start_time, 2)}s-{os.getpid()}: {response.json()}")
def asyncio_example():
import asyncio
import time
import httpx
from pyrate_limiter import Duration, limiter_factory
url = "https://httpbin.org/get"
async def ticker():
"""loops and prints time, showing the eventloop isn't blocked"""
while True:
print(f"[TICK] {time.time()}")
await asyncio.sleep(1)
async def afetch(client: httpx.AsyncClient, start_time: int):
await client.get(url)
async def example():
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
transport = AsyncRateLimiterTransport(limiter=limiter)
client = httpx.AsyncClient(transport=transport)
tasks = [afetch(client, url) for _ in range(10)]
asyncio.create_task(ticker())
results = await asyncio.gather(*tasks)
await client.aclose()
return results
asyncio.run(example())
def multiprocess_example():
import time
from concurrent.futures import ProcessPoolExecutor, wait
from pyrate_limiter import Duration, MultiprocessBucket, Rate
rate = Rate(1, Duration.SECOND)
bucket = MultiprocessBucket.init([rate])
start_time = time.time()
with ProcessPoolExecutor(initializer=limiter_factory.init_global_limiter, initargs=(bucket,)) as executor:
futures = [executor.submit(fetch, start_time) for _ in range(10)]
wait(futures)
for f in futures:
try:
f.result()
except Exception:
logger.exception("Task raised")
if __name__ == "__main__":
print("Single Process example: 10 requests")
singleprocess_example()
print("Multiprocessing example: 10 requests")
multiprocess_example()
print("Asyncio example: 10 requests")
asyncio_example()
|