1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
import importlib.util
import time
from datetime import datetime
import pytest
import redis
from packaging.version import Version
REDIS_VERSION = Version(redis.__version__)
def key_val_dict(size=100):
return {f"key:{i}".encode(): f"val:{i}".encode() for i in range(size)}
def raw_command(r: redis.Redis, *args):
"""Like execute_command, but does not do command-specific response parsing"""
response_callbacks = r.response_callbacks
try:
r.response_callbacks = {}
return r.execute_command(*args)
finally:
r.response_callbacks = response_callbacks
ALLOWED_CONDITIONS = {"eq", "gte", "lte", "lt", "gt", "ne"}
def run_test_if_redispy_ver(condition: str, ver: str):
if condition not in ALLOWED_CONDITIONS:
raise ValueError(f"condition {condition} is not in allowed conditions ({ALLOWED_CONDITIONS})")
cond = False
cond = cond or condition == "eq" and REDIS_VERSION == Version(ver)
cond = cond or condition == "gte" and REDIS_VERSION >= Version(ver)
cond = cond or condition == "lte" and REDIS_VERSION <= Version(ver)
cond = cond or condition == "lt" and REDIS_VERSION < Version(ver)
cond = cond or condition == "gt" and REDIS_VERSION > Version(ver)
cond = cond or condition == "ne" and REDIS_VERSION != Version(ver)
return pytest.mark.skipif(
not cond, reason=f"Test is not applicable to redis-py {REDIS_VERSION} ({condition}, {ver})"
)
_lua_module = importlib.util.find_spec("lupa")
run_test_if_lupa = pytest.mark.skipif(_lua_module is None, reason="Test is only applicable if lupa is installed")
fake_only = pytest.mark.parametrize(
"create_connection", [pytest.param("FakeStrictRedis", marks=pytest.mark.fake)], indirect=True
)
def redis_server_time(r: redis.Redis) -> datetime:
seconds, milliseconds = r.time()
timestamp = float(f"{seconds}.{milliseconds}")
return datetime.fromtimestamp(timestamp)
def current_time() -> int:
"""Return current_time in ms"""
return int(time.time() * 1000)
|