1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
import pytest
import redis
import valkey
from socketio import async_redis_manager
from socketio.async_redis_manager import AsyncRedisManager
class TestAsyncRedisManager:
def test_redis_not_installed(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
with pytest.raises(RuntimeError):
AsyncRedisManager('redis://')._redis_connect()
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aioredis = saved_redis
def test_valkey_not_installed(self):
saved_valkey = async_redis_manager.aiovalkey
async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://')._redis_connect()
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aiovalkey = saved_valkey
def test_redis_valkey_not_installed(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
saved_valkey = async_redis_manager.aiovalkey
async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError):
AsyncRedisManager('redis://')._redis_connect()
with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://')._redis_connect()
with pytest.raises(RuntimeError):
AsyncRedisManager('unix:///var/sock/redis.sock')._redis_connect()
async_redis_manager.aioredis = saved_redis
async_redis_manager.aiovalkey = saved_valkey
def test_bad_url(self):
with pytest.raises(ValueError):
AsyncRedisManager('http://localhost:6379')._redis_connect()
def test_redis_connect(self):
urls = [
'redis://localhost:6379',
'redis://localhost:6379/0',
'redis://:password@localhost:6379',
'redis://:password@localhost:6379/0',
'redis://user:password@localhost:6379',
'redis://user:password@localhost:6379/0',
'rediss://localhost:6379',
'rediss://localhost:6379/0',
'rediss://:password@localhost:6379',
'rediss://:password@localhost:6379/0',
'rediss://user:password@localhost:6379',
'rediss://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = AsyncRedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, redis.asyncio.Redis)
def test_valkey_connect(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
urls = [
'valkey://localhost:6379',
'valkey://localhost:6379/0',
'valkey://:password@localhost:6379',
'valkey://:password@localhost:6379/0',
'valkey://user:password@localhost:6379',
'valkey://user:password@localhost:6379/0',
'valkeys://localhost:6379',
'valkeys://localhost:6379/0',
'valkeys://:password@localhost:6379',
'valkeys://:password@localhost:6379/0',
'valkeys://user:password@localhost:6379',
'valkeys://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = AsyncRedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, valkey.asyncio.Valkey)
async_redis_manager.aioredis = saved_redis
|