File: test_pool.py

package info (click to toggle)
python-aio-pika 9.5.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,460 kB
  • sloc: python: 8,003; makefile: 37; xml: 1
file content (158 lines) | stat: -rw-r--r-- 4,050 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import asyncio
from collections import Counter

import pytest

from aio_pika.pool import Pool, PoolInstance


@pytest.mark.parametrize("max_size", [50, 10, 5, 1])
async def test_simple(max_size, event_loop):
    counter = 0

    async def create_instance():
        nonlocal counter
        await asyncio.sleep(0)
        counter += 1
        return counter

    pool: Pool = Pool(create_instance, max_size=max_size, loop=event_loop)

    async def getter():
        nonlocal counter, pool

        async with pool.acquire() as instance:
            assert instance > 0
            await asyncio.sleep(1 if counter < max_size else 0)
            return instance, counter

    results = await asyncio.gather(*[getter() for _ in range(200)])

    for instance, total in results:
        assert instance > -1
        assert total > -1

    assert counter == max_size


class TestInstanceBase:
    class Instance(PoolInstance):
        def __init__(self):
            self.closed = False

        async def close(self):
            if self.closed:
                raise RuntimeError

            self.closed = True

    @pytest.fixture
    def instances(self):
        return set()

    @pytest.fixture(params=[50, 40, 30, 20, 10])
    def max_size(self, request):
        return request.param

    @pytest.fixture
    def pool(self, max_size, instances, event_loop):
        async def create_instance():
            nonlocal instances

            obj = TestInstanceBase.Instance()
            instances.add(obj)
            return obj

        return Pool(create_instance, max_size=max_size, loop=event_loop)


class TestInstance(TestInstanceBase):
    async def test_close(self, pool, instances, event_loop, max_size):
        async def getter():
            async with pool.acquire():
                await asyncio.sleep(0.05)

        assert not pool.is_closed
        assert len(instances) == 0

        await asyncio.gather(*[getter() for _ in range(200)])

        assert len(instances) == max_size

        for instance in instances:
            assert not instance.closed

        await pool.close()

        for instance in instances:
            assert instance.closed

        assert pool.is_closed

    async def test_close_context_manager(self, pool, instances):
        async def getter():
            async with pool.acquire():
                await asyncio.sleep(0.05)

        async with pool:
            assert not pool.is_closed

            assert len(instances) == 0

            await asyncio.gather(*[getter() for _ in range(200)])

            assert len(instances) > 1

            for instance in instances:
                assert not instance.closed

            assert not pool.is_closed

        assert pool.is_closed

        for instance in instances:
            assert instance.closed


class TestCaseNoMaxSize(TestInstance):
    async def test_simple(self, pool, event_loop):
        call_count = 200
        counter = 0

        async def getter():
            nonlocal counter

            async with pool.acquire() as instance:
                await asyncio.sleep(1)
                assert isinstance(instance, TestInstanceBase.Instance)
                counter += 1
                return counter

        results = await asyncio.gather(*[getter() for _ in range(call_count)])

        for result in results:
            assert result > -1

        assert counter == call_count


class TestCaseItemReuse(TestInstanceBase):
    @pytest.fixture
    def call_count(self, max_size):
        return max_size * 5

    async def test_simple(self, pool, call_count, instances):
        counter: Counter = Counter()

        async def getter():
            nonlocal counter

            async with pool.acquire() as instance:
                await asyncio.sleep(0.05)
                counter[instance] += 1

        await asyncio.gather(*[getter() for _ in range(call_count)])

        assert sum(counter.values()) == call_count
        assert set(counter) == set(instances)
        assert len(set(counter.values())) == 1