1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
import asyncio
import logging
import pathlib
import ssl
import typing
from dataclasses import dataclass
from tempfile import TemporaryDirectory
import pytest
import pytest_asyncio
from aiohttp import web
MKCERT_CA_NOT_INSTALLED = b"local CA is not installed in the system trust store"
MKCERT_CA_ALREADY_INSTALLED = b"local CA is now installed in the system trust store"
SUBPROCESS_TIMEOUT = 5
# To avoid getting the SSLContext injected by truststore.
original_SSLContext = ssl.SSLContext
def decorator_requires_internet(decorator):
"""Mark a decorator with the "internet" mark"""
def wrapper(f):
return pytest.mark.internet(decorator(f))
return wrapper
successful_hosts = decorator_requires_internet(
pytest.mark.parametrize("host", ["example.com", "1.1.1.1"])
)
logger = logging.getLogger("aiohttp.web")
@pytest_asyncio.fixture
async def mkcert() -> typing.AsyncIterator[None]:
async def is_mkcert_available() -> bool:
try:
p = await asyncio.create_subprocess_exec(
"mkcert",
"-help",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
except FileNotFoundError:
return False
await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT)
return p.returncode == 0
# Checks to see if mkcert is available at all.
if not await is_mkcert_available():
pytest.skip("Install mkcert to run custom CA tests")
# Now we attempt to install the root certificate
# to the system trust store. Keep track if we should
# call mkcert -uninstall at the end.
should_mkcert_uninstall = False
try:
p = await asyncio.create_subprocess_exec(
"mkcert",
"-install",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
await p.wait()
assert p.returncode == 0
# See if the root cert was installed for the first
# time, if so we want to leave no trace.
stdout, _ = await p.communicate()
should_mkcert_uninstall = MKCERT_CA_ALREADY_INSTALLED in stdout
yield
finally:
# Only uninstall mkcert root cert if it wasn't
# installed before our attempt to install.
if should_mkcert_uninstall:
p = await asyncio.create_subprocess_exec(
"mkcert",
"-uninstall",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await p.wait()
@dataclass
class CertFiles:
key_file: pathlib.Path
cert_file: pathlib.Path
@pytest_asyncio.fixture
async def mkcert_certs(mkcert: None) -> typing.AsyncIterator[CertFiles]:
with TemporaryDirectory() as tmp_dir:
# Create the structure we'll eventually return
# as long as mkcert succeeds in creating the certs.
tmpdir_path = pathlib.Path(tmp_dir)
certs = CertFiles(
cert_file=tmpdir_path / "localhost.pem",
key_file=tmpdir_path / "localhost-key.pem",
)
cmd = (
"mkcert"
f" -cert-file {certs.cert_file}"
f" -key-file {certs.key_file}"
" localhost"
)
p = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT)
# Check for any signs that mkcert wasn't able to issue certs
# or that the CA isn't installed
stdout, _ = await p.communicate()
if MKCERT_CA_NOT_INSTALLED in stdout or p.returncode != 0:
raise RuntimeError(
f"mkcert couldn't issue certificates "
f"(exited with {p.returncode}): {stdout.decode()}"
)
yield certs
@dataclass
class Server:
host: str
port: int
@property
def base_url(self) -> str:
return f"https://{self.host}:{self.port}"
@pytest_asyncio.fixture(scope="function")
async def server(mkcert_certs: CertFiles) -> typing.AsyncIterator[Server]:
async def handler(request: web.Request) -> web.Response:
# Check the request was served over HTTPS.
assert request.scheme == "https"
return web.Response(status=200)
app = web.Application()
app.add_routes([web.get("/", handler)])
ctx = original_SSLContext(ssl.PROTOCOL_TLS_SERVER)
# We use str(pathlib.Path) here because PyPy doesn't accept Path objects.
# TODO: This is a bug in PyPy and should be reported to them, but their
# GitLab instance was offline when we found this bug. :'(
ctx.load_cert_chain(
certfile=str(mkcert_certs.cert_file),
keyfile=str(mkcert_certs.key_file),
)
# we need keepalive_timeout=0
# see https://github.com/aio-libs/aiohttp/issues/5426
runner = web.AppRunner(app, keepalive_timeout=0)
await runner.setup()
port = 9999 # Arbitrary choice.
site = web.TCPSite(runner, ssl_context=ctx, port=port)
await site.start()
try:
yield Server(host="localhost", port=port)
finally:
await site.stop()
await runner.cleanup()
|