File: conftest.py

package info (click to toggle)
pyrate-limiter 4.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,120 kB
  • sloc: python: 3,223; makefile: 21
file content (109 lines) | stat: -rw-r--r-- 3,740 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Pytest config & fixtures"""
from logging import basicConfig
from logging import getLogger
from os import getenv
from pathlib import Path
from tempfile import gettempdir
from typing import List
from typing import Union

import pytest

from pyrate_limiter import Duration
from pyrate_limiter import id_generator
from pyrate_limiter import InMemoryBucket
from pyrate_limiter import limiter_factory
from pyrate_limiter import MultiprocessBucket
from pyrate_limiter import PostgresBucket
from pyrate_limiter import Rate
from pyrate_limiter import RedisBucket


# Make log messages visible on test failure (or with pytest -s)
basicConfig(level="INFO")
# Uncomment for more verbose output:
logger = getLogger("pyrate_limiter")
logger.setLevel(getenv("LOG_LEVEL", "INFO"))

DEFAULT_RATES = [Rate(3, 1000), Rate(4, 1500)]

async def create_in_memory_bucket(rates: List[Rate]):
    return InMemoryBucket(rates)


async def create_redis_bucket(rates: List[Rate]):
    from redis import ConnectionPool
    from redis import Redis

    pool = ConnectionPool.from_url(getenv("REDIS", "redis://localhost:6379"))
    redis_db = Redis(connection_pool=pool)
    bucket_key = f"test-bucket/{id_generator()}"
    redis_db.delete(bucket_key)
    bucket = RedisBucket.init(rates, redis_db, bucket_key)
    assert bucket.count() == 0
    return bucket


async def create_async_redis_bucket(rates: List[Rate]):
    from redis.asyncio import ConnectionPool as AsyncConnectionPool
    from redis.asyncio import Redis as AsyncRedis

    pool: AsyncConnectionPool = AsyncConnectionPool.from_url(
        getenv("REDIS", "redis://localhost:6379")
    )
    redis_db: AsyncRedis = AsyncRedis(connection_pool=pool)
    bucket_key = f"test-bucket/{id_generator()}"
    await redis_db.delete(bucket_key)
    bucket = await RedisBucket.init(rates, redis_db, bucket_key)
    assert await bucket.count() == 0
    return bucket


async def create_mp_bucket(rates: List[Rate]):
    bucket = MultiprocessBucket.init(rates)

    return bucket


async def create_sqlite_bucket(rates: List[Rate], file_lock: bool = False):

    temp_dir = Path(gettempdir())
    default_db_path = temp_dir / f"pyrate_limiter_{id_generator(size=5)}.sqlite"
    table_name = f"pyrate-test-bucket-{id_generator(size=10)}"

    logger.info("SQLite db path: %s", default_db_path)

    return limiter_factory.create_sqlite_bucket(rates=rates,
                                                db_path=str(default_db_path),
                                                table_name=table_name,
                                                use_file_lock=file_lock)


async def create_filelocksqlite_bucket(rates: List[Rate]):
    return await create_sqlite_bucket(rates=rates, file_lock=True)


async def create_postgres_bucket(rates: List[Rate]):
    from psycopg_pool import ConnectionPool as PgConnectionPool

    pool = PgConnectionPool("postgresql://postgres:postgres@localhost:5432", open=True)
    table = f"test_bucket_{id_generator()}"
    bucket = PostgresBucket(pool, table, rates)
    assert bucket.count() == 0
    return bucket


@pytest.fixture(
    params=[
        pytest.param(create_in_memory_bucket, marks=pytest.mark.inmemory),
        pytest.param(create_redis_bucket, marks=pytest.mark.redis),
        pytest.param(create_async_redis_bucket, marks=pytest.mark.asyncredis),
        pytest.param(create_sqlite_bucket, marks=pytest.mark.sqlite),
        pytest.param(create_postgres_bucket, marks=pytest.mark.postgres),
        pytest.param(create_filelocksqlite_bucket, marks=pytest.mark.filelocksqlite),
        pytest.param(create_mp_bucket, marks=pytest.mark.mpbucket)
    ]
)
def create_bucket(request):
    """Parametrization for different bucket."""
    return request.param