1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import asyncio
import gc
import logging
import os
import tracemalloc
import pamqp
import pytest
from aiomisc_pytest import TCPProxy
from yarl import URL
from aiormq import Connection
def cert_path(*args):
return os.path.join(
os.path.abspath(os.path.dirname(__file__)), "certs", *args,
)
AMQP_URL = URL(os.getenv("AMQP_URL", "amqp://guest:guest@localhost/"))
amqp_urls = {
"amqp": AMQP_URL,
"amqp-named": AMQP_URL.update_query(name="pytest"),
"amqps": AMQP_URL.with_scheme("amqps").with_query(
{"cafile": cert_path("ca.pem"), "no_verify_ssl": 1},
),
"amqps-client": AMQP_URL.with_scheme("amqps").with_query(
{
"cafile": cert_path("ca.pem"),
"keyfile": cert_path("client.key"),
"certfile": cert_path("client.pem"),
"no_verify_ssl": 1,
},
),
}
amqp_url_list, amqp_url_ids = [], []
for name, url in amqp_urls.items():
amqp_url_list.append(url)
amqp_url_ids.append(name)
@pytest.fixture(params=amqp_url_list, ids=amqp_url_ids)
async def amqp_url(request):
return request.param
@pytest.fixture
async def amqp_connection(amqp_url, loop):
connection = Connection(amqp_url, loop=loop)
async with connection:
yield connection
channel_params = [
dict(channel_number=None, frame_buffer_size=10, publisher_confirms=True),
dict(channel_number=None, frame_buffer_size=1, publisher_confirms=True),
dict(channel_number=None, frame_buffer_size=10, publisher_confirms=False),
dict(channel_number=None, frame_buffer_size=1, publisher_confirms=False),
]
@pytest.fixture(params=channel_params)
async def amqp_channel(request, amqp_connection):
try:
yield await amqp_connection.channel(**request.param)
finally:
await amqp_connection.close()
skip_when_quick_test = pytest.mark.skipif(
os.getenv("TEST_QUICK") is not None, reason="quick test",
)
@pytest.fixture(autouse=True)
def memory_tracer():
tracemalloc.start()
tracemalloc.clear_traces()
filters = (
tracemalloc.Filter(True, pamqp.__file__),
tracemalloc.Filter(True, asyncio.__file__),
)
snapshot_before = tracemalloc.take_snapshot().filter_traces(filters)
def format_stat(stats):
items = [
"TOP STATS:",
"%-90s %6s %6s %6s" % ("Traceback", "line", "size", "count"),
]
for stat in stats:
fname = stat.traceback[0].filename
lineno = stat.traceback[0].lineno
items.append(
"%-90s %6s %6s %6s"
% (fname, lineno, stat.size_diff, stat.count_diff),
)
return "\n".join(items)
try:
yield
gc.collect()
snapshot_after = tracemalloc.take_snapshot().filter_traces(filters)
top_stats = snapshot_after.compare_to(
snapshot_before, "lineno", cumulative=True,
)
if top_stats:
logging.error(format_stat(top_stats))
raise AssertionError("Possible memory leak")
finally:
tracemalloc.stop()
@pytest.fixture()
async def proxy(tcp_proxy, localhost, amqp_url: URL):
port = amqp_url.port or 5672 if amqp_url.scheme == "amqp" else 5671
async with tcp_proxy(amqp_url.host, port) as proxy:
yield proxy
@pytest.fixture
async def proxy_connection(proxy: TCPProxy, amqp_url: URL, loop):
url = amqp_url.with_host(
"localhost",
).with_port(
proxy.proxy_port,
)
connection = Connection(url, loop=loop)
await connection.connect()
try:
yield connection
finally:
await connection.close()
|