File: test_connection_params.py

package info (click to toggle)
python-aio-pika 9.5.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,460 kB
  • sloc: python: 8,003; makefile: 37; xml: 1
file content (149 lines) | stat: -rw-r--r-- 4,600 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from typing import Type

from aiormq.connection import parse_bool, parse_int, parse_timeout
from yarl import URL

from aio_pika import connect
from aio_pika.abc import AbstractConnection
from aio_pika.connection import Connection
from aio_pika.robust_connection import RobustConnection, connect_robust


class MockConnection(Connection):
    async def connect(self, timeout=None, **kwargs):
        return self


class MockConnectionRobust(RobustConnection):
    async def connect(self, timeout=None, **kwargs):
        return self


VALUE_GENERATORS = {
    parse_int: {
        "-1": -1,
        "0": 0,
        "43": 43,
        "9999999999999999": 9999999999999999,
        "hello": 0,
    },
    parse_bool: {
        "disabled": False,
        "enable": True,
        "yes": True,
        "no": False,
        "": False,
    },
    parse_timeout: {
        "0": 0,
        "Vasyan": 0,
        "0.1": 0.1,
        "0.54": 0.54,
        "1": 1,
        "100": 100,
        "1000:": 0,
    },
    float: {
        "0": 0.,
        "0.0": 0.,
        ".0": 0.,
        "0.1": 0.1,
        "1": 1.,
        "hello": None,
    },
}


class TestCase:
    CONNECTION_CLASS: Type[AbstractConnection] = MockConnection

    async def get_instance(self, url, **kwargs) -> AbstractConnection:
        return await connect(       # type: ignore
            url, connection_class=self.CONNECTION_CLASS, **kwargs,
        )

    async def test_kwargs(self):
        instance = await self.get_instance("amqp://localhost/")

        for parameter in self.CONNECTION_CLASS.PARAMETERS:
            if parameter.is_kwarg:
                continue

            assert hasattr(instance, parameter.name)
            assert (
                getattr(instance, parameter.name) is
                parameter.parse(parameter.default)
            )

    async def test_kwargs_values(self):
        for parameter in self.CONNECTION_CLASS.PARAMETERS:
            positives = VALUE_GENERATORS[parameter.parser]  # type: ignore
            for example, expected in positives.items():  # type: ignore
                instance = await self.get_instance(
                    f"amqp://localhost/?{parameter.name}={example}",
                )

                assert parameter.parse(example) == expected

                if parameter.is_kwarg:
                    assert instance.kwargs[parameter.name] == expected
                else:
                    assert hasattr(instance, parameter.name)
                    assert getattr(instance, parameter.name) == expected

                    instance = await self.get_instance(
                        "amqp://localhost", **{parameter.name: example},
                    )
                    assert hasattr(instance, parameter.name)
                    assert getattr(instance, parameter.name) == expected


class TestCaseRobust(TestCase):
    CONNECTION_CLASS: Type[MockConnectionRobust] = MockConnectionRobust

    async def get_instance(self, url, **kwargs) -> AbstractConnection:
        return await connect_robust(        # type: ignore
            url,
            connection_class=self.CONNECTION_CLASS,  # type: ignore
            **kwargs,
        )


def test_connection_interleave(amqp_url: URL):
    url = amqp_url.update_query(interleave="1")
    connection = Connection(url=url)
    assert "interleave" in connection.kwargs
    assert connection.kwargs["interleave"] == 1

    connection = Connection(url=amqp_url)
    assert "interleave" not in connection.kwargs


def test_connection_happy_eyeballs_delay(amqp_url: URL):
    url = amqp_url.update_query(happy_eyeballs_delay=".1")
    connection = Connection(url=url)
    assert "happy_eyeballs_delay" in connection.kwargs
    assert connection.kwargs["happy_eyeballs_delay"] == 0.1

    connection = Connection(url=amqp_url)
    assert "happy_eyeballs_delay" not in connection.kwargs


def test_robust_connection_interleave(amqp_url: URL):
    url = amqp_url.update_query(interleave="1")
    connection = RobustConnection(url=url)
    assert "interleave" in connection.kwargs
    assert connection.kwargs["interleave"] == 1

    connection = RobustConnection(url=amqp_url)
    assert "interleave" not in connection.kwargs


def test_robust_connection_happy_eyeballs_delay(amqp_url: URL):
    url = amqp_url.update_query(happy_eyeballs_delay=".1")
    connection = RobustConnection(url=url)
    assert "happy_eyeballs_delay" in connection.kwargs
    assert connection.kwargs["happy_eyeballs_delay"] == 0.1

    connection = RobustConnection(url=amqp_url)
    assert "happy_eyeballs_delay" not in connection.kwargs