File: test_redis_manager.py

package info (click to toggle)
python-socketio 5.16.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,208 kB
  • sloc: python: 12,734; makefile: 15; sh: 7
file content (111 lines) | stat: -rw-r--r-- 4,052 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import pytest
import redis
import valkey

from socketio import async_redis_manager
from socketio.async_redis_manager import AsyncRedisManager


class TestAsyncRedisManager:
    def test_redis_not_installed(self):
        saved_redis = async_redis_manager.aioredis
        async_redis_manager.aioredis = None

        with pytest.raises(RuntimeError):
            AsyncRedisManager('redis://')._redis_connect()
        assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None

        async_redis_manager.aioredis = saved_redis

    def test_valkey_not_installed(self):
        saved_valkey = async_redis_manager.aiovalkey
        async_redis_manager.aiovalkey = None

        with pytest.raises(RuntimeError):
            AsyncRedisManager('valkey://')._redis_connect()
        assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None

        async_redis_manager.aiovalkey = saved_valkey

    def test_redis_valkey_not_installed(self):
        saved_redis = async_redis_manager.aioredis
        async_redis_manager.aioredis = None
        saved_valkey = async_redis_manager.aiovalkey
        async_redis_manager.aiovalkey = None

        with pytest.raises(RuntimeError):
            AsyncRedisManager('redis://')._redis_connect()
        with pytest.raises(RuntimeError):
            AsyncRedisManager('valkey://')._redis_connect()
        with pytest.raises(RuntimeError):
            AsyncRedisManager('unix:///var/sock/redis.sock')._redis_connect()

        async_redis_manager.aioredis = saved_redis
        async_redis_manager.aiovalkey = saved_valkey

    def test_bad_url(self):
        with pytest.raises(ValueError):
            AsyncRedisManager('http://localhost:6379')._redis_connect()

    def test_redis_connect(self):
        urls = [
            'redis://localhost:6379',
            'redis://localhost:6379/0',
            'redis://:password@localhost:6379',
            'redis://:password@localhost:6379/0',
            'redis://user:password@localhost:6379',
            'redis://user:password@localhost:6379/0',

            'rediss://localhost:6379',
            'rediss://localhost:6379/0',
            'rediss://:password@localhost:6379',
            'rediss://:password@localhost:6379/0',
            'rediss://user:password@localhost:6379',
            'rediss://user:password@localhost:6379/0',

            'unix:///var/sock/redis.sock',
            'unix:///var/sock/redis.sock?db=0',
            'unix://user@/var/sock/redis.sock',
            'unix://user@/var/sock/redis.sock?db=0',

            'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
        ]
        for url in urls:
            c = AsyncRedisManager(url)
            assert c.redis is None
            c._redis_connect()
            assert isinstance(c.redis, redis.asyncio.Redis)

    def test_valkey_connect(self):
        saved_redis = async_redis_manager.aioredis
        async_redis_manager.aioredis = None

        urls = [
            'valkey://localhost:6379',
            'valkey://localhost:6379/0',
            'valkey://:password@localhost:6379',
            'valkey://:password@localhost:6379/0',
            'valkey://user:password@localhost:6379',
            'valkey://user:password@localhost:6379/0',

            'valkeys://localhost:6379',
            'valkeys://localhost:6379/0',
            'valkeys://:password@localhost:6379',
            'valkeys://:password@localhost:6379/0',
            'valkeys://user:password@localhost:6379',
            'valkeys://user:password@localhost:6379/0',

            'unix:///var/sock/redis.sock',
            'unix:///var/sock/redis.sock?db=0',
            'unix://user@/var/sock/redis.sock',
            'unix://user@/var/sock/redis.sock?db=0',

            'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
        ]
        for url in urls:
            c = AsyncRedisManager(url)
            assert c.redis is None
            c._redis_connect()
            assert isinstance(c.redis, valkey.asyncio.Valkey)

        async_redis_manager.aioredis = saved_redis