1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
import asyncio
from collections import Counter
import pytest
from aio_pika.pool import Pool, PoolInstance
@pytest.mark.parametrize("max_size", [50, 10, 5, 1])
async def test_simple(max_size, event_loop):
counter = 0
async def create_instance():
nonlocal counter
await asyncio.sleep(0)
counter += 1
return counter
pool: Pool = Pool(create_instance, max_size=max_size, loop=event_loop)
async def getter():
nonlocal counter, pool
async with pool.acquire() as instance:
assert instance > 0
await asyncio.sleep(1 if counter < max_size else 0)
return instance, counter
results = await asyncio.gather(*[getter() for _ in range(200)])
for instance, total in results:
assert instance > -1
assert total > -1
assert counter == max_size
class TestInstanceBase:
class Instance(PoolInstance):
def __init__(self):
self.closed = False
async def close(self):
if self.closed:
raise RuntimeError
self.closed = True
@pytest.fixture
def instances(self):
return set()
@pytest.fixture(params=[50, 40, 30, 20, 10])
def max_size(self, request):
return request.param
@pytest.fixture
def pool(self, max_size, instances, event_loop):
async def create_instance():
nonlocal instances
obj = TestInstanceBase.Instance()
instances.add(obj)
return obj
return Pool(create_instance, max_size=max_size, loop=event_loop)
class TestInstance(TestInstanceBase):
async def test_close(self, pool, instances, event_loop, max_size):
async def getter():
async with pool.acquire():
await asyncio.sleep(0.05)
assert not pool.is_closed
assert len(instances) == 0
await asyncio.gather(*[getter() for _ in range(200)])
assert len(instances) == max_size
for instance in instances:
assert not instance.closed
await pool.close()
for instance in instances:
assert instance.closed
assert pool.is_closed
async def test_close_context_manager(self, pool, instances):
async def getter():
async with pool.acquire():
await asyncio.sleep(0.05)
async with pool:
assert not pool.is_closed
assert len(instances) == 0
await asyncio.gather(*[getter() for _ in range(200)])
assert len(instances) > 1
for instance in instances:
assert not instance.closed
assert not pool.is_closed
assert pool.is_closed
for instance in instances:
assert instance.closed
class TestCaseNoMaxSize(TestInstance):
async def test_simple(self, pool, event_loop):
call_count = 200
counter = 0
async def getter():
nonlocal counter
async with pool.acquire() as instance:
await asyncio.sleep(1)
assert isinstance(instance, TestInstanceBase.Instance)
counter += 1
return counter
results = await asyncio.gather(*[getter() for _ in range(call_count)])
for result in results:
assert result > -1
assert counter == call_count
class TestCaseItemReuse(TestInstanceBase):
@pytest.fixture
def call_count(self, max_size):
return max_size * 5
async def test_simple(self, pool, call_count, instances):
counter: Counter = Counter()
async def getter():
nonlocal counter
async with pool.acquire() as instance:
await asyncio.sleep(0.05)
counter[instance] += 1
await asyncio.gather(*[getter() for _ in range(call_count)])
assert sum(counter.values()) == call_count
assert set(counter) == set(instances)
assert len(set(counter.values())) == 1
|