File: integration_test.py

package info (click to toggle)
aioredis 1.3.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 1,104 kB
  • sloc: python: 11,112; makefile: 7
file content (99 lines) | stat: -rw-r--r-- 2,765 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import asyncio
import pytest

import aioredis


@pytest.fixture
def pool_or_redis(_closable, server):
    version = tuple(map(int, aioredis.__version__.split('.')[:2]))
    if version >= (1, 0):
        factory = aioredis.create_redis_pool
    else:
        factory = aioredis.create_pool

    async def redis_factory(maxsize):
        redis = await factory(server.tcp_address,
                              minsize=1, maxsize=maxsize)
        _closable(redis)
        return redis
    return redis_factory


async def simple_get_set(pool, idx):
    """A simple test to make sure Redis(pool) can be used as old Pool(Redis).
    """
    val = 'val:{}'.format(idx)
    with await pool as redis:
        assert await redis.set('key', val)
        await redis.get('key', encoding='utf-8')


async def pipeline(pool, val):
    val = 'val:{}'.format(val)
    with await pool as redis:
        f1 = redis.set('key', val)
        f2 = redis.get('key', encoding='utf-8')
        ok, res = await asyncio.gather(f1, f2)


async def transaction(pool, val):
    val = 'val:{}'.format(val)
    with await pool as redis:
        tr = redis.multi_exec()
        tr.set('key', val)
        tr.get('key', encoding='utf-8')
        ok, res = await tr.execute()
        assert ok, ok
        assert res == val


async def blocking_pop(pool, val):

    async def lpush():
        with await pool as redis:
            # here v0.3 has bound connection, v1.0 does not;
            await asyncio.sleep(.1)
            await redis.lpush('list-key', 'val')

    async def blpop():
        with await pool as redis:
            # here v0.3 has bound connection, v1.0 does not;
            res = await redis.blpop(
                'list-key', timeout=2, encoding='utf-8')
            assert res == ['list-key', 'val'], res
    await asyncio.gather(blpop(), lpush())


@pytest.mark.parametrize('test_case,pool_size', [
    (simple_get_set, 1),
    (pipeline, 1),
    (transaction, 1),
    pytest.param(
        blocking_pop, 1,
        marks=pytest.mark.xfail(
            reason="blpop gets connection first and blocks")
        ),
    (simple_get_set, 10),
    (pipeline, 10),
    (transaction, 10),
    (blocking_pop, 10),
], ids=lambda o: getattr(o, '__name__', repr(o)))
async def test_operations(pool_or_redis, test_case, pool_size):
    repeat = 100
    redis = await pool_or_redis(pool_size)
    done, pending = await asyncio.wait(
        [asyncio.ensure_future(test_case(redis, i))
         for i in range(repeat)])

    assert not pending
    success = 0
    failures = []
    for fut in done:
        exc = fut.exception()
        if exc is None:
            success += 1
        else:
            failures.append(exc)
    assert repeat == success, failures
    assert not failures