1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
|
import asyncio
import base64
import os
import socket
import ssl
import sys
from hashlib import md5, sha1, sha256
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Generator
from unittest import mock
from uuid import uuid4
import pytest
from aiohttp.client_proto import ResponseHandler
from aiohttp.http import WS_KEY
from aiohttp.test_utils import get_unused_port_socket, loop_context
try:
import trustme
# Check if the CA is available in runtime, MacOS on Py3.10 fails somehow
trustme.CA()
TRUSTME: bool = True
except ImportError:
TRUSTME = False
pytest_plugins = ["aiohttp.pytest_plugin", "pytester"]
IS_HPUX = sys.platform.startswith("hp-ux")
IS_LINUX = sys.platform.startswith("linux")
@pytest.fixture
def tls_certificate_authority():
if not TRUSTME:
pytest.xfail("trustme is not supported")
return trustme.CA()
@pytest.fixture
def tls_certificate(tls_certificate_authority):
return tls_certificate_authority.issue_cert(
"localhost",
"xn--prklad-4va.localhost",
"127.0.0.1",
"::1",
)
@pytest.fixture
def ssl_ctx(tls_certificate):
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
tls_certificate.configure_cert(ssl_ctx)
return ssl_ctx
@pytest.fixture
def client_ssl_ctx(tls_certificate_authority):
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
tls_certificate_authority.configure_trust(ssl_ctx)
return ssl_ctx
@pytest.fixture
def tls_ca_certificate_pem_path(tls_certificate_authority):
with tls_certificate_authority.cert_pem.tempfile() as ca_cert_pem:
yield ca_cert_pem
@pytest.fixture
def tls_certificate_pem_path(tls_certificate):
with tls_certificate.private_key_and_cert_chain_pem.tempfile() as cert_pem:
yield cert_pem
@pytest.fixture
def tls_certificate_pem_bytes(tls_certificate):
return tls_certificate.cert_chain_pems[0].bytes()
@pytest.fixture
def tls_certificate_fingerprint_sha256(tls_certificate_pem_bytes):
tls_cert_der = ssl.PEM_cert_to_DER_cert(tls_certificate_pem_bytes.decode())
return sha256(tls_cert_der).digest()
@pytest.fixture
def unix_sockname(tmp_path, tmp_path_factory):
"""Generate an fs path to the UNIX domain socket for testing.
N.B. Different OS kernels have different fs path length limitations
for it. For Linux, it's 108, for HP-UX it's 92 (or higher) depending
on its version. For for most of the BSDs (Open, Free, macOS) it's
mostly 104 but sometimes it can be down to 100.
Ref: https://github.com/aio-libs/aiohttp/issues/3572
"""
if not hasattr(socket, "AF_UNIX"):
pytest.skip("requires UNIX sockets")
max_sock_len = 92 if IS_HPUX else 108 if IS_LINUX else 100
"""Amount of bytes allocated for the UNIX socket path by OS kernel.
Ref: https://unix.stackexchange.com/a/367012/27133
"""
sock_file_name = "unix.sock"
unique_prefix = f"{uuid4()!s}-"
unique_prefix_len = len(unique_prefix.encode())
root_tmp_dir = Path("/tmp").resolve()
os_tmp_dir = Path(os.getenv("TMPDIR", "/tmp")).resolve()
original_base_tmp_path = Path(
str(tmp_path_factory.getbasetemp()),
).resolve()
original_base_tmp_path_hash = md5(
str(original_base_tmp_path).encode(),
).hexdigest()
def make_tmp_dir(base_tmp_dir):
return TemporaryDirectory(
dir=str(base_tmp_dir),
prefix="pt-",
suffix=f"-{original_base_tmp_path_hash!s}",
)
def assert_sock_fits(sock_path):
sock_path_len = len(sock_path.encode())
# exit-check to verify that it's correct and simplify debugging
# in the future
assert sock_path_len <= max_sock_len, (
"Suggested UNIX socket ({sock_path}) is {sock_path_len} bytes "
"long but the current kernel only has {max_sock_len} bytes "
"allocated to hold it so it must be shorter. "
"See https://github.com/aio-libs/aiohttp/issues/3572 "
"for more info."
).format_map(locals())
paths = original_base_tmp_path, os_tmp_dir, root_tmp_dir
unique_paths = [p for n, p in enumerate(paths) if p not in paths[:n]]
paths_num = len(unique_paths)
for num, tmp_dir_path in enumerate(paths, 1):
with make_tmp_dir(tmp_dir_path) as tmpd:
tmpd = Path(tmpd).resolve()
sock_path = str(tmpd / sock_file_name)
sock_path_len = len(sock_path.encode())
if num >= paths_num:
# exit-check to verify that it's correct and simplify
# debugging in the future
assert_sock_fits(sock_path)
if sock_path_len <= max_sock_len:
if max_sock_len - sock_path_len >= unique_prefix_len:
# If we're lucky to have extra space in the path,
# let's also make it more unique
sock_path = str(tmpd / "".join((unique_prefix, sock_file_name)))
# Double-checking it:
assert_sock_fits(sock_path)
yield sock_path
return
@pytest.fixture
def pipe_name():
name = rf"\\.\pipe\{uuid4().hex}"
return name
@pytest.fixture
def create_mocked_conn(loop: Any):
def _proto_factory(conn_closing_result=None, **kwargs):
proto = mock.create_autospec(ResponseHandler, **kwargs)
proto.closed = loop.create_future()
proto.closed.set_result(conn_closing_result)
return proto
yield _proto_factory
@pytest.fixture
def selector_loop():
policy = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
with loop_context(policy.new_event_loop) as _loop:
asyncio.set_event_loop(_loop)
yield _loop
@pytest.fixture
def netrc_contents(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
request: pytest.FixtureRequest,
):
"""
Prepare :file:`.netrc` with given contents.
Monkey-patches :envvar:`NETRC` to point to created file.
"""
netrc_contents = getattr(request, "param", None)
netrc_file_path = tmp_path / ".netrc"
if netrc_contents is not None:
netrc_file_path.write_text(netrc_contents)
monkeypatch.setenv("NETRC", str(netrc_file_path))
return netrc_file_path
@pytest.fixture
def start_connection():
with mock.patch(
"aiohttp.connector.aiohappyeyeballs.start_connection",
autospec=True,
spec_set=True,
return_value=mock.create_autospec(socket.socket, spec_set=True, instance=True),
) as start_connection_mock:
yield start_connection_mock
@pytest.fixture
def key_data():
return os.urandom(16)
@pytest.fixture
def key(key_data: Any):
return base64.b64encode(key_data)
@pytest.fixture
def ws_key(key: Any):
return base64.b64encode(sha1(key + WS_KEY).digest()).decode()
@pytest.fixture
def enable_cleanup_closed() -> Generator[None, None, None]:
"""Fixture to override the NEEDS_CLEANUP_CLOSED flag.
On Python 3.12.7+ and 3.13.1+ enable_cleanup_closed is not needed,
however we still want to test that it works.
"""
with mock.patch("aiohttp.connector.NEEDS_CLEANUP_CLOSED", True):
yield
@pytest.fixture
def unused_port_socket() -> Generator[socket.socket, None, None]:
"""Return a socket that is unused on the current host.
Unlike aiohttp_used_port, the socket is yielded so there is no
race condition between checking if the port is in use and
binding to it later in the test.
"""
s = get_unused_port_socket("127.0.0.1")
try:
yield s
finally:
s.close()
|