1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
from typing import Type
from aiormq.connection import parse_bool, parse_int, parse_timeout
from yarl import URL
from aio_pika import connect
from aio_pika.abc import AbstractConnection
from aio_pika.connection import Connection
from aio_pika.robust_connection import RobustConnection, connect_robust
class MockConnection(Connection):
async def connect(self, timeout=None, **kwargs):
return self
class MockConnectionRobust(RobustConnection):
async def connect(self, timeout=None, **kwargs):
return self
VALUE_GENERATORS = {
parse_int: {
"-1": -1,
"0": 0,
"43": 43,
"9999999999999999": 9999999999999999,
"hello": 0,
},
parse_bool: {
"disabled": False,
"enable": True,
"yes": True,
"no": False,
"": False,
},
parse_timeout: {
"0": 0,
"Vasyan": 0,
"0.1": 0.1,
"0.54": 0.54,
"1": 1,
"100": 100,
"1000:": 0,
},
float: {
"0": 0.,
"0.0": 0.,
".0": 0.,
"0.1": 0.1,
"1": 1.,
"hello": None,
},
}
class TestCase:
CONNECTION_CLASS: Type[AbstractConnection] = MockConnection
async def get_instance(self, url, **kwargs) -> AbstractConnection:
return await connect( # type: ignore
url, connection_class=self.CONNECTION_CLASS, **kwargs,
)
async def test_kwargs(self):
instance = await self.get_instance("amqp://localhost/")
for parameter in self.CONNECTION_CLASS.PARAMETERS:
if parameter.is_kwarg:
continue
assert hasattr(instance, parameter.name)
assert (
getattr(instance, parameter.name) is
parameter.parse(parameter.default)
)
async def test_kwargs_values(self):
for parameter in self.CONNECTION_CLASS.PARAMETERS:
positives = VALUE_GENERATORS[parameter.parser] # type: ignore
for example, expected in positives.items(): # type: ignore
instance = await self.get_instance(
f"amqp://localhost/?{parameter.name}={example}",
)
assert parameter.parse(example) == expected
if parameter.is_kwarg:
assert instance.kwargs[parameter.name] == expected
else:
assert hasattr(instance, parameter.name)
assert getattr(instance, parameter.name) == expected
instance = await self.get_instance(
"amqp://localhost", **{parameter.name: example},
)
assert hasattr(instance, parameter.name)
assert getattr(instance, parameter.name) == expected
class TestCaseRobust(TestCase):
CONNECTION_CLASS: Type[MockConnectionRobust] = MockConnectionRobust
async def get_instance(self, url, **kwargs) -> AbstractConnection:
return await connect_robust( # type: ignore
url,
connection_class=self.CONNECTION_CLASS, # type: ignore
**kwargs,
)
def test_connection_interleave(amqp_url: URL):
url = amqp_url.update_query(interleave="1")
connection = Connection(url=url)
assert "interleave" in connection.kwargs
assert connection.kwargs["interleave"] == 1
connection = Connection(url=amqp_url)
assert "interleave" not in connection.kwargs
def test_connection_happy_eyeballs_delay(amqp_url: URL):
url = amqp_url.update_query(happy_eyeballs_delay=".1")
connection = Connection(url=url)
assert "happy_eyeballs_delay" in connection.kwargs
assert connection.kwargs["happy_eyeballs_delay"] == 0.1
connection = Connection(url=amqp_url)
assert "happy_eyeballs_delay" not in connection.kwargs
def test_robust_connection_interleave(amqp_url: URL):
url = amqp_url.update_query(interleave="1")
connection = RobustConnection(url=url)
assert "interleave" in connection.kwargs
assert connection.kwargs["interleave"] == 1
connection = RobustConnection(url=amqp_url)
assert "interleave" not in connection.kwargs
def test_robust_connection_happy_eyeballs_delay(amqp_url: URL):
url = amqp_url.update_query(happy_eyeballs_delay=".1")
connection = RobustConnection(url=url)
assert "happy_eyeballs_delay" in connection.kwargs
assert connection.kwargs["happy_eyeballs_delay"] == 0.1
connection = RobustConnection(url=amqp_url)
assert "happy_eyeballs_delay" not in connection.kwargs
|