1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
import asyncio
from typing import Any, List
import aio_pika
from aio_pika.patterns.master import (
CompressedJsonMaster, JsonMaster, Master, NackMessage, RejectMessage,
)
class TestMaster:
MASTER_CLASS = Master
async def test_simple(self, channel: aio_pika.Channel):
master = self.MASTER_CLASS(channel)
event = asyncio.Event()
self.state: List[Any] = []
def worker_func(*, foo, bar):
nonlocal event
self.state.append((foo, bar))
event.set()
worker = await master.create_worker(
"worker.foo", worker_func, auto_delete=True,
)
await master.proxy.worker.foo(foo=1, bar=2)
await event.wait()
assert self.state == [(1, 2)]
await worker.close()
async def test_simple_coro(self, channel: aio_pika.Channel):
master = self.MASTER_CLASS(channel)
event = asyncio.Event()
self.state = []
async def worker_func(*, foo, bar):
nonlocal event
self.state.append((foo, bar))
event.set()
worker = await master.create_worker(
"worker.foo", worker_func, auto_delete=True,
)
await master.proxy.worker.foo(foo=1, bar=2)
await event.wait()
assert self.state == [(1, 2)]
await worker.close()
async def test_simple_many(self, channel: aio_pika.Channel):
master = self.MASTER_CLASS(channel)
tasks = 100
state = []
def worker_func(*, foo):
nonlocal tasks, state
state.append(foo)
tasks -= 1
worker = await master.create_worker(
"worker.foo", worker_func, auto_delete=True,
)
for item in range(100):
await master.proxy.worker.foo(foo=item)
while tasks > 0:
await asyncio.sleep(0)
assert state == list(range(100))
await worker.close()
async def test_exception_classes(self, channel: aio_pika.Channel):
master = self.MASTER_CLASS(channel)
counter = 200
self.state = []
def worker_func(*, foo):
nonlocal counter
counter -= 1
if foo < 50:
raise RejectMessage(requeue=False)
if foo > 100:
raise NackMessage(requeue=False)
self.state.append(foo)
worker = await master.create_worker(
"worker.foo", worker_func, auto_delete=True,
)
for item in range(200):
await master.proxy.worker.foo(foo=item)
while counter > 0:
await asyncio.sleep(0)
assert self.state == list(range(50, 101))
await worker.close()
class TestJsonMaster(TestMaster):
MASTER_CLASS = JsonMaster
class TestCompressedJsonMaster(TestMaster):
MASTER_CLASS = CompressedJsonMaster
|