File: redis_example.py

package info (click to toggle)
pyrate-limiter 4.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,120 kB
  • sloc: python: 3,223; makefile: 21
file content (71 lines) | stat: -rw-r--r-- 1,890 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# ruff: noqa: T201
import asyncio
import time
from datetime import datetime
from typing import List

import pytest
import redis

from pyrate_limiter import Duration, Limiter, Rate, RedisBucket


async def ticker():
    for _ in range(8):
        print(f"[TICK] {datetime.now()}")
        await asyncio.sleep(0.5)


def create_redis_bucket(rates: List[Rate]):
    redis_db = redis.Redis(host="localhost")
    bucket = RedisBucket.init(rates, redis_db, "test")
    bucket.flush()
    assert bucket.count() == 0
    return bucket


async def create_async_redis_bucket(rates: List[Rate]):
    redis_db = redis.asyncio.Redis(host="localhost")
    bucket = await RedisBucket.init(rates, redis_db, "test3")
    await bucket.flush()
    assert await bucket.count() == 0
    return bucket


@pytest.mark.asyncio
async def test_redis_async():
    rates = [Rate(3, Duration.SECOND)]

    redis_bucket = await create_async_redis_bucket(rates)
    limiter = Limiter(redis_bucket)

    async def task(name, weight):
        acquired = await limiter.try_acquire_async(name, weight)
        print(f"{datetime.now()} {name}: {weight}, {acquired=}")

    start = time.time()
    await asyncio.gather(ticker(), *[task(str(i), 1) for i in range(10)])
    print(f"Run 10 calls in {time.time() - start:,.2f} sec")


def test_redis_sync():
    rates = [Rate(3, Duration.SECOND)]

    def task(name, weight):
        acquired = limiter.try_acquire(name, weight)
        print(f"{datetime.now()} {name}: {weight}, {acquired=}")

    redis_bucket = create_redis_bucket(rates)
    limiter = Limiter(redis_bucket)
    for i in range(10):
        task(str(i), 1)


if __name__ == "__main__":
    print("To start a redis container: \n# docker run -d --name redis-test -p 6379:6379 redis:7")

    print("Redis (non-Async) bucket")
    test_redis_sync()

    print("AsyncRedis bucket")
    asyncio.run(test_redis_async())