File: test_master.py

package info (click to toggle)
python-aio-pika 9.5.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,460 kB
  • sloc: python: 8,003; makefile: 37; xml: 1
file content (122 lines) | stat: -rw-r--r-- 2,894 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
from typing import Any, List

import aio_pika
from aio_pika.patterns.master import (
    CompressedJsonMaster, JsonMaster, Master, NackMessage, RejectMessage,
)


class TestMaster:
    MASTER_CLASS = Master

    async def test_simple(self, channel: aio_pika.Channel):
        master = self.MASTER_CLASS(channel)
        event = asyncio.Event()

        self.state: List[Any] = []

        def worker_func(*, foo, bar):
            nonlocal event
            self.state.append((foo, bar))
            event.set()

        worker = await master.create_worker(
            "worker.foo", worker_func, auto_delete=True,
        )

        await master.proxy.worker.foo(foo=1, bar=2)

        await event.wait()

        assert self.state == [(1, 2)]

        await worker.close()

    async def test_simple_coro(self, channel: aio_pika.Channel):
        master = self.MASTER_CLASS(channel)
        event = asyncio.Event()

        self.state = []

        async def worker_func(*, foo, bar):
            nonlocal event
            self.state.append((foo, bar))
            event.set()

        worker = await master.create_worker(
            "worker.foo", worker_func, auto_delete=True,
        )

        await master.proxy.worker.foo(foo=1, bar=2)

        await event.wait()

        assert self.state == [(1, 2)]

        await worker.close()

    async def test_simple_many(self, channel: aio_pika.Channel):
        master = self.MASTER_CLASS(channel)
        tasks = 100

        state = []

        def worker_func(*, foo):
            nonlocal tasks, state

            state.append(foo)
            tasks -= 1

        worker = await master.create_worker(
            "worker.foo", worker_func, auto_delete=True,
        )

        for item in range(100):
            await master.proxy.worker.foo(foo=item)

        while tasks > 0:
            await asyncio.sleep(0)

        assert state == list(range(100))

        await worker.close()

    async def test_exception_classes(self, channel: aio_pika.Channel):
        master = self.MASTER_CLASS(channel)
        counter = 200

        self.state = []

        def worker_func(*, foo):
            nonlocal counter
            counter -= 1

            if foo < 50:
                raise RejectMessage(requeue=False)
            if foo > 100:
                raise NackMessage(requeue=False)

            self.state.append(foo)

        worker = await master.create_worker(
            "worker.foo", worker_func, auto_delete=True,
        )

        for item in range(200):
            await master.proxy.worker.foo(foo=item)

        while counter > 0:
            await asyncio.sleep(0)

        assert self.state == list(range(50, 101))

        await worker.close()


class TestJsonMaster(TestMaster):
    MASTER_CLASS = JsonMaster


class TestCompressedJsonMaster(TestMaster):
    MASTER_CLASS = CompressedJsonMaster