File: conftest.py

package info (click to toggle)
python-truststore 0.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 236 kB
  • sloc: python: 1,835; makefile: 13; sh: 6
file content (175 lines) | stat: -rw-r--r-- 5,293 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import asyncio
import logging
import pathlib
import ssl
import typing
from dataclasses import dataclass
from tempfile import TemporaryDirectory

import pytest
import pytest_asyncio
from aiohttp import web

MKCERT_CA_NOT_INSTALLED = b"local CA is not installed in the system trust store"
MKCERT_CA_ALREADY_INSTALLED = b"local CA is now installed in the system trust store"
SUBPROCESS_TIMEOUT = 5

# To avoid getting the SSLContext injected by truststore.
original_SSLContext = ssl.SSLContext


def decorator_requires_internet(decorator):
    """Mark a decorator with the "internet" mark"""

    def wrapper(f):
        return pytest.mark.internet(decorator(f))

    return wrapper


successful_hosts = decorator_requires_internet(
    pytest.mark.parametrize("host", ["example.com", "1.1.1.1"])
)

logger = logging.getLogger("aiohttp.web")


@pytest_asyncio.fixture
async def mkcert() -> typing.AsyncIterator[None]:
    async def is_mkcert_available() -> bool:
        try:
            p = await asyncio.create_subprocess_exec(
                "mkcert",
                "-help",
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
            )
        except FileNotFoundError:
            return False
        await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT)
        return p.returncode == 0

    # Checks to see if mkcert is available at all.
    if not await is_mkcert_available():
        pytest.skip("Install mkcert to run custom CA tests")

    # Now we attempt to install the root certificate
    # to the system trust store. Keep track if we should
    # call mkcert -uninstall at the end.
    should_mkcert_uninstall = False
    try:
        p = await asyncio.create_subprocess_exec(
            "mkcert",
            "-install",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.STDOUT,
        )
        await p.wait()
        assert p.returncode == 0

        # See if the root cert was installed for the first
        # time, if so we want to leave no trace.
        stdout, _ = await p.communicate()
        should_mkcert_uninstall = MKCERT_CA_ALREADY_INSTALLED in stdout

        yield

    finally:
        # Only uninstall mkcert root cert if it wasn't
        # installed before our attempt to install.
        if should_mkcert_uninstall:
            p = await asyncio.create_subprocess_exec(
                "mkcert",
                "-uninstall",
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
            )
            await p.wait()


@dataclass
class CertFiles:
    key_file: pathlib.Path
    cert_file: pathlib.Path


@pytest_asyncio.fixture
async def mkcert_certs(mkcert: None) -> typing.AsyncIterator[CertFiles]:
    with TemporaryDirectory() as tmp_dir:
        # Create the structure we'll eventually return
        # as long as mkcert succeeds in creating the certs.
        tmpdir_path = pathlib.Path(tmp_dir)
        certs = CertFiles(
            cert_file=tmpdir_path / "localhost.pem",
            key_file=tmpdir_path / "localhost-key.pem",
        )

        cmd = (
            "mkcert"
            f" -cert-file {certs.cert_file}"
            f" -key-file {certs.key_file}"
            " localhost"
        )
        p = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.STDOUT,
        )
        await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT)

        # Check for any signs that mkcert wasn't able to issue certs
        # or that the CA isn't installed
        stdout, _ = await p.communicate()
        if MKCERT_CA_NOT_INSTALLED in stdout or p.returncode != 0:
            raise RuntimeError(
                f"mkcert couldn't issue certificates "
                f"(exited with {p.returncode}): {stdout.decode()}"
            )

        yield certs


@dataclass
class Server:
    host: str
    port: int

    @property
    def base_url(self) -> str:
        return f"https://{self.host}:{self.port}"


@pytest_asyncio.fixture(scope="function")
async def server(mkcert_certs: CertFiles) -> typing.AsyncIterator[Server]:
    async def handler(request: web.Request) -> web.Response:
        # Check the request was served over HTTPS.
        assert request.scheme == "https"

        return web.Response(status=200)

    app = web.Application()
    app.add_routes([web.get("/", handler)])

    ctx = original_SSLContext(ssl.PROTOCOL_TLS_SERVER)

    # We use str(pathlib.Path) here because PyPy doesn't accept Path objects.
    # TODO: This is a bug in PyPy and should be reported to them, but their
    # GitLab instance was offline when we found this bug. :'(
    ctx.load_cert_chain(
        certfile=str(mkcert_certs.cert_file),
        keyfile=str(mkcert_certs.key_file),
    )

    # we need keepalive_timeout=0
    # see https://github.com/aio-libs/aiohttp/issues/5426
    runner = web.AppRunner(app, keepalive_timeout=0)
    await runner.setup()
    port = 9999  # Arbitrary choice.
    site = web.TCPSite(runner, ssl_context=ctx, port=port)

    await site.start()
    try:
        yield Server(host="localhost", port=port)
    finally:
        await site.stop()
        await runner.cleanup()