from __future__ import annotations

import concurrent.futures
import contextlib
import datetime
import os.path
import shutil
import ssl
import sys
import tempfile
import time
import typing
import warnings
from pathlib import Path
from test import (
    LONG_TIMEOUT,
    SHORT_TIMEOUT,
    TARPIT_HOST,
    requires_network,
    resolvesLocalhostFQDN,
)
from test.conftest import ServerConfig
from unittest import mock

import pytest
import trustme

import urllib3.http2
import urllib3.http2.probe as http2_probe
import urllib3.util as util
import urllib3.util.ssl_
from dummyserver.socketserver import (
    DEFAULT_CA,
    DEFAULT_CA_KEY,
    DEFAULT_CERTS,
    encrypt_key_pem,
)
from dummyserver.testcase import HTTPSHypercornDummyServerTestCase
from urllib3 import HTTPSConnectionPool
from urllib3.connection import RECENT_DATE, HTTPSConnection, VerifiedHTTPSConnection
from urllib3.exceptions import (
    ConnectTimeoutError,
    InsecureRequestWarning,
    MaxRetryError,
    ProtocolError,
    SSLError,
    SystemTimeWarning,
)
from urllib3.util.ssl_match_hostname import CertificateError
from urllib3.util.timeout import Timeout

TLSv1_CERTS = DEFAULT_CERTS.copy()
TLSv1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1", None)

TLSv1_1_CERTS = DEFAULT_CERTS.copy()
TLSv1_1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_1", None)

TLSv1_2_CERTS = DEFAULT_CERTS.copy()
TLSv1_2_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_2", None)

TLSv1_3_CERTS = DEFAULT_CERTS.copy()
TLSv1_3_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLS", None)


CLIENT_INTERMEDIATE_PEM = "client_intermediate.pem"
CLIENT_NO_INTERMEDIATE_PEM = "client_no_intermediate.pem"
CLIENT_INTERMEDIATE_KEY = "client_intermediate.key"
PASSWORD_CLIENT_KEYFILE = "client_password.key"
CLIENT_CERT = CLIENT_INTERMEDIATE_PEM


class BaseTestHTTPS(HTTPSHypercornDummyServerTestCase):
    tls_protocol_name: str | None = None

    def tls_protocol_not_default(self) -> bool:
        return self.tls_protocol_name in {"TLSv1", "TLSv1.1"}

    def tls_version(self) -> ssl.TLSVersion:
        if self.tls_protocol_name is None:
            return pytest.skip("Skipping base test class")
        try:
            from ssl import TLSVersion
        except ImportError:
            return pytest.skip("ssl.TLSVersion isn't available")
        return TLSVersion[self.tls_protocol_name.replace(".", "_")]

    def ssl_version(self) -> int:
        if self.tls_protocol_name is None:
            return pytest.skip("Skipping base test class")

        if self.tls_protocol_name == "TLSv1.3" and ssl.HAS_TLSv1_3:
            return ssl.PROTOCOL_TLS_CLIENT
        if self.tls_protocol_name == "TLSv1.2" and ssl.HAS_TLSv1_2:
            return ssl.PROTOCOL_TLSv1_2
        if self.tls_protocol_name == "TLSv1.1" and ssl.HAS_TLSv1_1:
            return ssl.PROTOCOL_TLSv1_1
        if self.tls_protocol_name == "TLSv1" and ssl.HAS_TLSv1:
            return ssl.PROTOCOL_TLSv1
        else:
            return pytest.skip(f"{self.tls_protocol_name} isn't available")

    @classmethod
    def setup_class(cls) -> None:
        super().setup_class()

        cls.certs_dir = tempfile.mkdtemp()
        # Start from existing root CA as we don't want to change the server certificate yet
        with open(DEFAULT_CA, "rb") as crt, open(DEFAULT_CA_KEY, "rb") as key:
            root_ca = trustme.CA.from_pem(crt.read(), key.read())

        # Generate another CA to test verification failure
        bad_ca = trustme.CA()
        cls.bad_ca_path = os.path.join(cls.certs_dir, "ca_bad.pem")
        bad_ca.cert_pem.write_to_path(cls.bad_ca_path)

        # client cert chain
        intermediate_ca = root_ca.create_child_ca()
        cert = intermediate_ca.issue_cert("example.com")
        encrypted_key = encrypt_key_pem(cert.private_key_pem, b"letmein")

        cert.private_key_pem.write_to_path(
            os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_KEY)
        )
        encrypted_key.write_to_path(
            os.path.join(cls.certs_dir, PASSWORD_CLIENT_KEYFILE)
        )
        # Write the client cert and the intermediate CA
        client_cert = os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_PEM)
        cert.cert_chain_pems[0].write_to_path(client_cert)
        cert.cert_chain_pems[1].write_to_path(client_cert, append=True)
        # Write only the client cert
        cert.cert_chain_pems[0].write_to_path(
            os.path.join(cls.certs_dir, CLIENT_NO_INTERMEDIATE_PEM)
        )

    @classmethod
    def teardown_class(cls) -> None:
        super().teardown_class()

        shutil.rmtree(cls.certs_dir)

    def test_simple(self, http_version: str) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200, r.data
            assert r.headers["server"] == f"hypercorn-{http_version}"
            assert r.data == b"Dummy server!"

    def test_default_port(self) -> None:
        conn = HTTPSConnection(self.host, port=None)
        assert conn.port == 443

    @resolvesLocalhostFQDN()
    def test_dotted_fqdn(self) -> None:
        with HTTPSConnectionPool(
            self.host + ".",
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as pool:
            r = pool.request("GET", "/")
            assert r.status == 200, r.data

    def test_client_intermediate(self) -> None:
        """Check that certificate chains work well with client certs

        We generate an intermediate CA from the root CA, and issue a client certificate
        from that intermediate CA. Since the server only knows about the root CA, we
        need to send it the certificate *and* the intermediate CA, so that it can check
        the whole chain.
        """
        with HTTPSConnectionPool(
            self.host,
            self.port,
            key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY),
            cert_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_PEM),
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            r = https_pool.request("GET", "/certificate")
            subject = r.json()
            assert subject["organizationalUnitName"].startswith("Testing cert")

    def test_client_no_intermediate(self) -> None:
        """Check that missing links in certificate chains indeed break

        The only difference with test_client_intermediate is that we don't send the
        intermediate CA to the server, only the client cert.
        """
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_file=os.path.join(self.certs_dir, CLIENT_NO_INTERMEDIATE_PEM),
            key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY),
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises((SSLError, ProtocolError)):
                https_pool.request("GET", "/certificate", retries=False)

    def test_client_key_password(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE),
            cert_file=os.path.join(self.certs_dir, CLIENT_CERT),
            key_password="letmein",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            r = https_pool.request("GET", "/certificate")
            subject = r.json()
            assert subject["organizationalUnitName"].startswith("Testing cert")

    def test_client_encrypted_key_requires_password(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE),
            cert_file=os.path.join(self.certs_dir, CLIENT_CERT),
            key_password=None,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises(MaxRetryError, match="password is required") as e:
                https_pool.request("GET", "/certificate")

            assert type(e.value.reason) is SSLError

    def test_verified(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                assert conn.__class__ == VerifiedHTTPSConnection

            with warnings.catch_warnings(record=True) as w:
                r = https_pool.request("GET", "/")
                assert r.status == 200

            assert [str(wm) for wm in w] == []

    def test_verified_with_context(self) -> None:
        ctx = util.ssl_.create_urllib3_context(
            cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version()
        )
        ctx.load_verify_locations(cafile=DEFAULT_CA)
        with HTTPSConnectionPool(self.host, self.port, ssl_context=ctx) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                assert conn.__class__ == VerifiedHTTPSConnection

            with mock.patch("warnings.warn") as warn:
                r = https_pool.request("GET", "/")
                assert r.status == 200
                assert not warn.called, warn.call_args_list

    def test_context_combines_with_ca_certs(self) -> None:
        ctx = util.ssl_.create_urllib3_context(
            cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version()
        )
        with HTTPSConnectionPool(
            self.host, self.port, ca_certs=DEFAULT_CA, ssl_context=ctx
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                assert conn.__class__ == VerifiedHTTPSConnection

            with mock.patch("warnings.warn") as warn:
                r = https_pool.request("GET", "/")
                assert r.status == 200
                assert not warn.called, warn.call_args_list

    def test_ca_dir_verified(self, tmp_path: Path) -> None:
        # OpenSSL looks up certificates by the hash for their name, see c_rehash
        # TODO infer the bytes using `cryptography.x509.Name.public_bytes`.
        # https://github.com/pyca/cryptography/pull/3236
        shutil.copyfile(DEFAULT_CA, str(tmp_path / "81deb5f7.0"))

        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_cert_dir=str(tmp_path),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                assert conn.__class__ == VerifiedHTTPSConnection

            with warnings.catch_warnings(record=True) as w:
                r = https_pool.request("GET", "/")
                assert r.status == 200

            assert [str(wm) for wm in w] == []

    def test_invalid_common_name(self) -> None:
        with HTTPSConnectionPool(
            "127.0.0.1",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises(MaxRetryError) as e:
                https_pool.request("GET", "/", retries=0)
            assert type(e.value.reason) is SSLError
            assert "doesn't match" in str(
                e.value.reason
            ) or "certificate verify failed" in str(e.value.reason)

    def test_verified_with_bad_ca_certs(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=self.bad_ca_path,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises(MaxRetryError) as e:
                https_pool.request("GET", "/")
            assert type(e.value.reason) is SSLError
            assert (
                "certificate verify failed" in str(e.value.reason)
                # PyPy is more specific
                or "self signed certificate in certificate chain" in str(e.value.reason)
            ), f"Expected 'certificate verify failed', instead got: {e.value.reason!r}"

    def test_wrap_socket_failure_resource_leak(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=self.bad_ca_path,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with contextlib.closing(https_pool._get_conn()) as conn:
                with pytest.raises(ssl.SSLError):
                    conn.connect()

                assert conn.sock is not None  # type: ignore[attr-defined]

    def test_verified_without_ca_certs(self) -> None:
        # default is cert_reqs=None which is ssl.CERT_NONE
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises(MaxRetryError) as e:
                https_pool.request("GET", "/")
            assert type(e.value.reason) is SSLError
            # there is a different error message depending on whether or
            # not pyopenssl is injected
            assert (
                "No root certificates specified" in str(e.value.reason)
                # PyPy is more specific
                or "self signed certificate in certificate chain" in str(e.value.reason)
                # PyPy sometimes uses all-caps here
                or "certificate verify failed" in str(e.value.reason).lower()
                or "invalid certificate chain" in str(e.value.reason)
            ), (
                "Expected 'No root certificates specified',  "
                "'certificate verify failed', or "
                "'invalid certificate chain', "
                "instead got: %r" % e.value.reason
            )

    def test_no_ssl(self) -> None:
        with HTTPSConnectionPool(self.host, self.port) as pool:
            pool.ConnectionCls = None  # type: ignore[assignment]
            with pytest.raises(ImportError):
                pool._new_conn()
            with pytest.raises(ImportError):
                pool.request("GET", "/", retries=0)

    def test_unverified_ssl(self) -> None:
        """Test that bare HTTPSConnection can connect, make requests"""
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs=ssl.CERT_NONE,
            ssl_minimum_version=self.tls_version(),
        ) as pool:
            with mock.patch("warnings.warn") as warn:
                r = pool.request("GET", "/")
                assert r.status == 200
                assert warn.called

                # Modern versions of Python, or systems using PyOpenSSL, only emit
                # the unverified warning. Older systems may also emit other
                # warnings, which we want to ignore here.
                calls = warn.call_args_list
                assert InsecureRequestWarning in [x[0][1] for x in calls]

    def test_ssl_unverified_with_ca_certs(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_NONE",
            ca_certs=self.bad_ca_path,
            ssl_minimum_version=self.tls_version(),
        ) as pool:
            with mock.patch("warnings.warn") as warn:
                r = pool.request("GET", "/")
                assert r.status == 200
                assert warn.called

                # Modern versions of Python, or systems using PyOpenSSL, only emit
                # the unverified warning. Older systems may also emit other
                # warnings, which we want to ignore here.
                calls = warn.call_args_list

                category = calls[0][0][1]
                assert category == InsecureRequestWarning

    def test_assert_hostname_false(self) -> None:
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.assert_hostname = False
            https_pool.request("GET", "/")

    def test_assert_specific_hostname(self) -> None:
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.assert_hostname = "localhost"
            https_pool.request("GET", "/")

    def test_server_hostname(self) -> None:
        with HTTPSConnectionPool(
            "127.0.0.1",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            server_hostname="localhost",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            conn = https_pool._new_conn()
            conn.request("GET", "/")

            # Assert the wrapping socket is using the passed-through SNI name.
            # pyopenssl doesn't let you pull the server_hostname back off the
            # socket, so only add this assertion if the attribute is there (i.e.
            # the python ssl module).
            if hasattr(conn.sock, "server_hostname"):  # type: ignore[attr-defined]
                assert conn.sock.server_hostname == "localhost"  # type: ignore[attr-defined]
            conn.getresponse().close()
            conn.close()

    def test_assert_fingerprint_md5(self) -> None:
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=("55:39:BF:70:05:12:43:FA:1F:D1:BF:4E:E8:1B:07:1D"),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.request("GET", "/")

    def test_assert_fingerprint_sha1(self) -> None:
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=(
                "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
            ),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.request("GET", "/")

    def test_assert_fingerprint_sha256(self) -> None:
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=(
                "E3:59:8E:69:FF:C5:9F:C7:88:87:44:58:22:7F:90:8D:D9:BC:12:C4:90:79:D5:"
                "DC:A8:5D:4F:60:40:1E:A6:D2"
            ),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.request("GET", "/")

    def test_assert_invalid_fingerprint(self) -> None:
        def _test_request(pool: HTTPSConnectionPool) -> SSLError:
            with pytest.raises(MaxRetryError) as cm:
                pool.request("GET", "/", retries=0)
            assert type(cm.value.reason) is SSLError
            return cm.value.reason

        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.assert_fingerprint = (
                "AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA"
            )
            e = _test_request(https_pool)
            expected = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
            got = "728b554c9afc1e88a11cad1bb2e7cc3edbc8f98a"
            assert (
                str(e)
                == f'Fingerprints did not match. Expected "{expected}", got "{got}"'
            )

            # Uneven length
            https_pool.assert_fingerprint = "AA:A"
            e = _test_request(https_pool)
            assert "Fingerprint of invalid length:" in str(e)

            # Invalid length
            https_pool.assert_fingerprint = "AA"
            e = _test_request(https_pool)
            assert "Fingerprint of invalid length:" in str(e)

    def test_verify_none_and_bad_fingerprint(self) -> None:
        with HTTPSConnectionPool(
            "127.0.0.1",
            self.port,
            cert_reqs="CERT_NONE",
            assert_hostname=False,
            assert_fingerprint=(
                "AA:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
            ),
        ) as https_pool:
            with pytest.raises(MaxRetryError) as cm:
                https_pool.request("GET", "/", retries=0)
            assert type(cm.value.reason) is SSLError

    def test_verify_none_and_good_fingerprint(self) -> None:
        with HTTPSConnectionPool(
            "127.0.0.1",
            self.port,
            cert_reqs="CERT_NONE",
            assert_hostname=False,
            assert_fingerprint=(
                "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
            ),
        ) as https_pool:
            https_pool.request("GET", "/")

    def test_good_fingerprint_and_hostname_mismatch(self) -> None:
        with HTTPSConnectionPool(
            "127.0.0.1",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=(
                "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
            ),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.request("GET", "/")

    @requires_network()
    def test_https_timeout(self) -> None:
        timeout = Timeout(total=None, connect=SHORT_TIMEOUT)
        with HTTPSConnectionPool(
            TARPIT_HOST,
            self.port,
            timeout=timeout,
            retries=False,
            cert_reqs="CERT_REQUIRED",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.raises(ConnectTimeoutError):
                https_pool.request("GET", "/")

        timeout = Timeout(read=0.01)
        with HTTPSConnectionPool(
            self.host,
            self.port,
            timeout=timeout,
            retries=False,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=(
                "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
            ),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            # TODO This was removed in https://github.com/urllib3/urllib3/pull/703/files
            # We need to put something back or remove this block.
            pass

        timeout = Timeout(total=None)
        with HTTPSConnectionPool(
            self.host,
            self.port,
            timeout=timeout,
            cert_reqs="CERT_NONE",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with pytest.warns(InsecureRequestWarning):
                https_pool.request("GET", "/")

    def test_tunnel(self, http_version: str) -> None:
        """test the _tunnel behavior"""
        timeout = Timeout(total=None)
        with HTTPSConnectionPool(
            self.host,
            self.port,
            timeout=timeout,
            cert_reqs="CERT_NONE",
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                if http_version == "h2":
                    with pytest.raises(NotImplementedError) as e:
                        conn.set_tunnel(self.host, self.port)
                    assert (
                        str(e.value)
                        == "HTTP/2 does not support setting up a tunnel through a proxy"
                    )
                else:
                    conn.set_tunnel(self.host, self.port)
                    with mock.patch.object(
                        conn, "_tunnel", create=True, return_value=None
                    ) as conn_tunnel:
                        with pytest.warns(InsecureRequestWarning):
                            https_pool._make_request(conn, "GET", "/")
                    conn_tunnel.assert_called_once_with()

    @requires_network()
    def test_enhanced_timeout(self) -> None:
        with HTTPSConnectionPool(
            TARPIT_HOST,
            self.port,
            timeout=Timeout(connect=SHORT_TIMEOUT),
            retries=False,
            cert_reqs="CERT_REQUIRED",
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                with pytest.raises(ConnectTimeoutError):
                    https_pool.request("GET", "/")
                with pytest.raises(ConnectTimeoutError):
                    https_pool._make_request(conn, "GET", "/")

        with HTTPSConnectionPool(
            TARPIT_HOST,
            self.port,
            timeout=Timeout(connect=LONG_TIMEOUT),
            retries=False,
            cert_reqs="CERT_REQUIRED",
        ) as https_pool:
            with pytest.raises(ConnectTimeoutError):
                https_pool.request("GET", "/", timeout=Timeout(connect=SHORT_TIMEOUT))

        with HTTPSConnectionPool(
            TARPIT_HOST,
            self.port,
            timeout=Timeout(total=None),
            retries=False,
            cert_reqs="CERT_REQUIRED",
        ) as https_pool:
            with contextlib.closing(https_pool._new_conn()) as conn:
                with pytest.raises(ConnectTimeoutError):
                    https_pool.request(
                        "GET", "/", timeout=Timeout(total=None, connect=SHORT_TIMEOUT)
                    )

    def test_enhanced_ssl_connection(self) -> None:
        fingerprint = "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"

        with HTTPSConnectionPool(
            self.host,
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=fingerprint,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200

    def test_ssl_correct_system_time(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.cert_reqs = "CERT_REQUIRED"
            https_pool.ca_certs = DEFAULT_CA

            w = self._request_without_resource_warnings("GET", "/")
            assert [] == w

    def test_ssl_wrong_system_time(self) -> None:
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            https_pool.cert_reqs = "CERT_REQUIRED"
            https_pool.ca_certs = DEFAULT_CA
            with mock.patch("urllib3.connection.datetime") as mock_date:
                mock_date.date.today.return_value = datetime.date(1970, 1, 1)

                w = self._request_without_resource_warnings("GET", "/")

                assert len(w) == 1
                warning = w[0]

                assert SystemTimeWarning == warning.category
                assert isinstance(warning.message, Warning)
                assert str(RECENT_DATE) in warning.message.args[0]

    def _request_without_resource_warnings(
        self, method: str, url: str
    ) -> list[warnings.WarningMessage]:
        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter("always")
            with HTTPSConnectionPool(
                self.host,
                self.port,
                ca_certs=DEFAULT_CA,
                ssl_minimum_version=self.tls_version(),
            ) as https_pool:
                https_pool.request(method, url)

        w = [x for x in w if not isinstance(x.message, ResourceWarning)]

        return w

    def test_set_ssl_version_to_tls_version(self) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")

        with HTTPSConnectionPool(
            self.host, self.port, ca_certs=DEFAULT_CA
        ) as https_pool:
            https_pool.ssl_version = ssl_version = self.certs["ssl_version"]
            if ssl_version is getattr(ssl, "PROTOCOL_TLS", object()):
                cmgr: contextlib.AbstractContextManager[object] = (
                    contextlib.nullcontext()
                )
            else:
                cmgr = pytest.warns(
                    DeprecationWarning,
                    match=r"'ssl_version' option is deprecated and will be removed "
                    r"in urllib3 v2\.6\.0\. Instead use 'ssl_minimum_version'",
                )
            with cmgr:
                r = https_pool.request("GET", "/")
            assert r.status == 200, r.data

    def test_set_cert_default_cert_required(self) -> None:
        conn = VerifiedHTTPSConnection(self.host, self.port)
        with pytest.warns(DeprecationWarning) as w:
            conn.set_cert()
        assert conn.cert_reqs == ssl.CERT_REQUIRED
        assert len(w) == 1 and str(w[0].message) == (
            "HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. "
            "Instead provide the parameters to the HTTPSConnection constructor."
        )

    @pytest.mark.parametrize("verify_mode", [ssl.CERT_NONE, ssl.CERT_REQUIRED])
    def test_set_cert_inherits_cert_reqs_from_ssl_context(
        self, verify_mode: int
    ) -> None:
        ssl_context = urllib3.util.ssl_.create_urllib3_context(cert_reqs=verify_mode)
        assert ssl_context.verify_mode == verify_mode

        conn = HTTPSConnection(self.host, self.port, ssl_context=ssl_context)
        with pytest.warns(DeprecationWarning) as w:
            conn.set_cert()

        assert conn.cert_reqs == verify_mode
        assert (
            conn.ssl_context is not None and conn.ssl_context.verify_mode == verify_mode
        )
        assert len(w) == 1 and str(w[0].message) == (
            "HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. "
            "Instead provide the parameters to the HTTPSConnection constructor."
        )

    def test_tls_protocol_name_of_socket(self) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")

        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
            ssl_maximum_version=self.tls_version(),
        ) as https_pool:
            with contextlib.closing(https_pool._get_conn()) as conn:
                conn.connect()
                if not hasattr(conn.sock, "version"):  # type: ignore[attr-defined]
                    pytest.skip("SSLSocket.version() not available")
                assert conn.sock.version() == self.tls_protocol_name  # type: ignore[attr-defined]

    def test_ssl_version_is_deprecated(self) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")
        if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT:
            pytest.skip(
                "Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated"
            )

        with HTTPSConnectionPool(
            self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=self.ssl_version()
        ) as https_pool:
            with contextlib.closing(https_pool._get_conn()) as conn:
                with pytest.warns(DeprecationWarning) as w:
                    conn.connect()

        assert len(w) >= 1
        assert any(x.category == DeprecationWarning for x in w)
        assert any(
            str(x.message)
            == (
                "'ssl_version' option is deprecated and will be removed in "
                "urllib3 v2.6.0. Instead use 'ssl_minimum_version'"
            )
            for x in w
        )

    @pytest.mark.parametrize(
        "ssl_version", [None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT]
    )
    def test_ssl_version_with_protocol_tls_or_client_not_deprecated(
        self, ssl_version: int | None
    ) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")
        if self.tls_protocol_not_default():
            pytest.skip(
                f"Skipping because '{self.tls_protocol_name}' isn't set by default"
            )

        with HTTPSConnectionPool(
            self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=ssl_version
        ) as https_pool:
            with contextlib.closing(https_pool._get_conn()) as conn:
                with warnings.catch_warnings(record=True) as w:
                    conn.connect()

        assert [str(wm) for wm in w if wm.category != ResourceWarning] == []

    def test_no_tls_version_deprecation_with_ssl_context(self) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")

        ctx = util.ssl_.create_urllib3_context(ssl_minimum_version=self.tls_version())

        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_context=ctx,
        ) as https_pool:
            with contextlib.closing(https_pool._get_conn()) as conn:
                with warnings.catch_warnings(record=True) as w:
                    conn.connect()

        assert [str(wm) for wm in w if wm.category != ResourceWarning] == []

    def test_tls_version_maximum_and_minimum(self) -> None:
        if self.tls_protocol_name is None:
            pytest.skip("Skipping base test class")

        from ssl import TLSVersion

        min_max_versions = [
            (self.tls_version(), self.tls_version()),
            (TLSVersion.MINIMUM_SUPPORTED, self.tls_version()),
            (TLSVersion.MINIMUM_SUPPORTED, TLSVersion.MAXIMUM_SUPPORTED),
        ]

        for minimum_version, maximum_version in min_max_versions:
            with HTTPSConnectionPool(
                self.host,
                self.port,
                ca_certs=DEFAULT_CA,
                ssl_minimum_version=minimum_version,
                ssl_maximum_version=maximum_version,
            ) as https_pool:
                conn = https_pool._get_conn()
                try:
                    conn.connect()
                    if maximum_version == TLSVersion.MAXIMUM_SUPPORTED:
                        # A higher protocol than tls_protocol_name could be negotiated
                        assert conn.sock.version() >= self.tls_protocol_name  # type: ignore[attr-defined]
                    else:
                        assert conn.sock.version() == self.tls_protocol_name  # type: ignore[attr-defined]
                finally:
                    conn.close()

    @pytest.mark.parametrize("use_env_var_expansion", [True, False])
    def test_sslkeylogfile(
        self,
        tmp_path: Path,
        monkeypatch: pytest.MonkeyPatch,
        use_env_var_expansion: bool,
    ) -> None:
        if not hasattr(util.SSLContext, "keylog_filename"):
            pytest.skip("requires OpenSSL 1.1.1+")

        keylog_file = tmp_path / "keylogfile.txt"
        if use_env_var_expansion:
            monkeypatch.setenv("FILEPATH", str(keylog_file))
            if sys.platform == "win32":
                monkeypatch.setenv("SSLKEYLOGFILE", "%FILEPATH%")
            else:
                monkeypatch.setenv("SSLKEYLOGFILE", "${FILEPATH}")
        else:
            monkeypatch.setenv("SSLKEYLOGFILE", str(keylog_file))

        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200, r.data
            assert keylog_file.is_file(), "keylogfile '%s' should exist" % str(
                keylog_file
            )
            assert keylog_file.read_text().startswith(
                "# TLS secrets log file"
            ), "keylogfile '%s' should start with '# TLS secrets log file'" % str(
                keylog_file
            )

    @pytest.mark.parametrize("sslkeylogfile", [None, ""])
    def test_sslkeylogfile_empty(
        self, monkeypatch: pytest.MonkeyPatch, sslkeylogfile: str | None
    ) -> None:
        # Assert that an HTTPS connection doesn't error out when given
        # no SSLKEYLOGFILE or an empty value (ie 'SSLKEYLOGFILE=')
        if sslkeylogfile is not None:
            monkeypatch.setenv("SSLKEYLOGFILE", sslkeylogfile)
        else:
            monkeypatch.delenv("SSLKEYLOGFILE", raising=False)
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as pool:
            r = pool.request("GET", "/")
            assert r.status == 200, r.data

    def test_alpn_default(self, http_version: str) -> None:
        """Default ALPN protocols are sent by default."""
        with HTTPSConnectionPool(
            self.host,
            self.port,
            ca_certs=DEFAULT_CA,
            ssl_minimum_version=self.tls_version(),
        ) as pool:
            r = pool.request("GET", "/alpn_protocol", retries=0)
            assert r.status == 200
            assert r.data.decode("utf-8") == util.ALPN_PROTOCOLS[0]
            assert (
                r.data.decode("utf-8") == {"h11": "http/1.1", "h2": "h2"}[http_version]
            )

    def test_http2_probe_result_is_cached(self, http_version: str) -> None:
        assert http2_probe._values() == {}

        for i in range(2):  # Do this twice to exercise the cache path
            with HTTPSConnectionPool(
                self.host,
                self.port,
                ca_certs=DEFAULT_CA,
            ) as pool:
                r = pool.request("GET", "/alpn_protocol", retries=0)
                assert r.status == 200

            if http_version == "h2":
                # This means the probe was successful.
                assert http2_probe._values() == {(self.host, self.port): True}
            else:
                # This means the probe wasn't attempted, otherwise would have a value.
                assert http_version == "h11"
                assert http2_probe._values() == {}

    @pytest.mark.xfail(reason="Hypercorn always supports both HTTP/2 and HTTP/1.1")
    def test_http2_probe_result_failed(self, http_version: str) -> None:
        if http_version == "h2":
            pytest.skip("Test must have server in HTTP/1.1 mode")
        assert http2_probe._values() == {}

        urllib3.http2.inject_into_urllib3()
        try:
            with HTTPSConnectionPool(
                self.host,
                self.port,
                ca_certs=DEFAULT_CA,
            ) as pool:
                r = pool.request("GET", "/", retries=0)
                assert r.status == 200

            # The probe was a failure because Hypercorn didn't support HTTP/2.
            assert http2_probe._values() == {(self.host, self.port): False}
        finally:
            urllib3.http2.extract_from_urllib3()

    def test_http2_probe_no_result_in_connect_error(self) -> None:
        assert http2_probe._values() == {}

        urllib3.http2.inject_into_urllib3()
        try:
            with HTTPSConnectionPool(
                TARPIT_HOST,
                self.port,
                ca_certs=DEFAULT_CA,
                timeout=SHORT_TIMEOUT,
            ) as pool:
                with pytest.raises(ConnectTimeoutError):
                    pool.request("GET", "/", retries=False)

            # The probe was inconclusive since an error occurred during connection.
            assert http2_probe._values() == {(TARPIT_HOST, self.port): None}
        finally:
            urllib3.http2.extract_from_urllib3()

    def test_http2_probe_no_result_in_ssl_error(self) -> None:
        urllib3.http2.inject_into_urllib3()
        try:
            with HTTPSConnectionPool(
                self.host,
                self.port,
                ca_certs=None,
                timeout=LONG_TIMEOUT,
            ) as pool:
                with pytest.raises(SSLError):
                    pool.request("GET", "/", retries=False)

            # The probe was inconclusive since an error occurred during connection.
            assert http2_probe._values() == {(self.host, self.port): None}
        finally:
            urllib3.http2.extract_from_urllib3()

    @requires_network()
    def test_http2_probe_blocked_per_thread(self) -> None:
        state, current_thread, last_action = None, None, time.perf_counter()

        def connect_callback(label: str, thread_id: int, **kwargs: typing.Any) -> None:
            nonlocal state, current_thread, last_action

            if label in ("before connect", "after connect failure"):
                # We don't know if the target supports HTTP/2 as connections fail
                assert kwargs["target_supports_http2"] is None

            # Since we're trying to connect to TARPIT_HOST, all connections will
            # fail, but they should be tried one after the other
            now = time.perf_counter()
            assert now >= last_action
            last_action = now

            if label == "before connect":
                assert state is None
                state = "connect"
                assert current_thread != thread_id
                current_thread = thread_id
            elif label == "after connect failure":
                assert state == "connect"
                assert current_thread == thread_id
                state = None

        assert http2_probe._values() == {}

        connect_timeout = LONG_TIMEOUT
        total_threads = 3
        urllib3.http2.inject_into_urllib3()
        try:

            def try_connect(_: typing.Any) -> tuple[float, float]:
                with HTTPSConnectionPool(
                    TARPIT_HOST,
                    self.port,
                    ca_certs=DEFAULT_CA,
                    timeout=connect_timeout,
                ) as pool:
                    start_time = time.time()
                    conn = pool._get_conn()
                    assert isinstance(conn, HTTPSConnection)
                    conn._connect_callback = connect_callback
                    with pytest.raises(ConnectTimeoutError):
                        conn.connect()
                    end_time = time.time()
                    return start_time, end_time

            threadpool = concurrent.futures.ThreadPoolExecutor(total_threads)
            list(threadpool.map(try_connect, range(total_threads)))

            # The probe was inconclusive since an error occurred during connection.
            assert http2_probe._values() == {(TARPIT_HOST, self.port): None}
        finally:
            urllib3.http2.extract_from_urllib3()

    def test_default_ssl_context_ssl_min_max_versions(self) -> None:
        ctx = urllib3.util.ssl_.create_urllib3_context()
        assert ctx.minimum_version == ssl.TLSVersion.TLSv1_2
        # urllib3 sets a default maximum version only when it is
        # injected with PyOpenSSL SSL-support.
        # Otherwise, the default maximum version is set by Python's
        # `ssl.SSLContext`. The value respects OpenSSL configuration and
        # can be different from `ssl.TLSVersion.MAXIMUM_SUPPORTED`.
        # https://github.com/urllib3/urllib3/issues/2477#issuecomment-1151452150
        if util.IS_PYOPENSSL:
            expected_maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
        else:
            expected_maximum_version = ssl.SSLContext(
                ssl.PROTOCOL_TLS_CLIENT
            ).maximum_version
        assert ctx.maximum_version == expected_maximum_version

    def test_ssl_context_ssl_version_uses_ssl_min_max_versions(self) -> None:
        if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT:
            pytest.skip(
                "Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated"
            )

        with pytest.warns(
            DeprecationWarning,
            match=r"'ssl_version' option is deprecated and will be removed in "
            r"urllib3 v2\.6\.0\. Instead use 'ssl_minimum_version'",
        ):
            ctx = urllib3.util.ssl_.create_urllib3_context(
                ssl_version=self.ssl_version()
            )
        assert ctx.minimum_version == self.tls_version()
        assert ctx.maximum_version == self.tls_version()

    def test_default_ssl_context_verify_flags(self) -> None:
        ctx = urllib3.util.ssl_.create_urllib3_context()
        ssl_ctx = ssl.create_default_context()
        assert ctx.verify_flags == ssl_ctx.verify_flags

    def test_assert_missing_hashfunc(self, monkeypatch: pytest.MonkeyPatch) -> None:
        fingerprint = "55:39:BF:70:05:12:43:FA:1F:D1:BF:4E:E8:1B:07:1D"
        with HTTPSConnectionPool(
            "localhost",
            self.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=DEFAULT_CA,
            assert_fingerprint=(fingerprint),
            ssl_minimum_version=self.tls_version(),
        ) as https_pool:
            digest_length = len(fingerprint.replace(":", "").lower())
            monkeypatch.setitem(urllib3.util.ssl_.HASHFUNC_MAP, digest_length, None)
            with pytest.raises(MaxRetryError) as cm:
                https_pool.request("GET", "/", retries=0)
            assert type(cm.value.reason) is SSLError
            assert (
                f"Hash function implementation unavailable for fingerprint length: {digest_length}"
                == str(cm.value.reason)
            )


@pytest.mark.usefixtures("requires_tlsv1")
class TestHTTPS_TLSv1(BaseTestHTTPS):
    tls_protocol_name = "TLSv1"
    certs = TLSv1_CERTS


@pytest.mark.usefixtures("requires_tlsv1_1")
class TestHTTPS_TLSv1_1(BaseTestHTTPS):
    tls_protocol_name = "TLSv1.1"
    certs = TLSv1_1_CERTS


@pytest.mark.usefixtures("requires_tlsv1_2")
class TestHTTPS_TLSv1_2(BaseTestHTTPS):
    tls_protocol_name = "TLSv1.2"
    certs = TLSv1_2_CERTS


@pytest.mark.usefixtures("requires_tlsv1_3")
class TestHTTPS_TLSv1_3(BaseTestHTTPS):
    tls_protocol_name = "TLSv1.3"
    certs = TLSv1_3_CERTS


class TestHTTPS_Hostname:
    def test_can_validate_san(self, san_server: ServerConfig) -> None:
        """Ensure that urllib3 can validate SANs with IP addresses in them."""
        with HTTPSConnectionPool(
            san_server.host,
            san_server.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=san_server.ca_certs,
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200

    def test_common_name_without_san_fails(self, no_san_server: ServerConfig) -> None:
        with HTTPSConnectionPool(
            no_san_server.host,
            no_san_server.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=no_san_server.ca_certs,
        ) as https_pool:
            with pytest.raises(
                MaxRetryError,
            ) as e:
                https_pool.request("GET", "/")
            assert (
                "mismatch, certificate is not valid" in str(e.value)
                or "no appropriate subjectAltName" in str(e.value)
                or "Empty Subject Alternative Name extension" in str(e.value)
            )

    def test_common_name_without_san_with_different_common_name(
        self, no_san_server_with_different_commmon_name: ServerConfig
    ) -> None:
        ctx = urllib3.util.ssl_.create_urllib3_context(verify_flags=0)
        try:
            ctx.hostname_checks_common_name = True
        except AttributeError:
            pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'")

        with HTTPSConnectionPool(
            no_san_server_with_different_commmon_name.host,
            no_san_server_with_different_commmon_name.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=no_san_server_with_different_commmon_name.ca_certs,
            ssl_context=ctx,
        ) as https_pool:
            with pytest.raises(MaxRetryError) as e:
                https_pool.request("GET", "/")
            assert "mismatch, certificate is not valid for 'localhost'" in str(
                e.value
            ) or "hostname 'localhost' doesn't match 'example.com'" in str(e.value)

    @pytest.mark.parametrize("use_assert_hostname", [True, False])
    def test_hostname_checks_common_name_respected(
        self, no_san_server: ServerConfig, use_assert_hostname: bool
    ) -> None:
        ctx = urllib3.util.ssl_.create_urllib3_context(verify_flags=0)
        if not hasattr(ctx, "hostname_checks_common_name"):
            pytest.skip("Test requires 'SSLContext.hostname_checks_common_name'")
        ctx.load_verify_locations(no_san_server.ca_certs)
        try:
            ctx.hostname_checks_common_name = True
        except AttributeError:
            pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'")

        err: MaxRetryError | None
        try:
            with HTTPSConnectionPool(
                no_san_server.host,
                no_san_server.port,
                cert_reqs="CERT_REQUIRED",
                ssl_context=ctx,
                assert_hostname=no_san_server.host if use_assert_hostname else None,
            ) as https_pool:
                https_pool.request("GET", "/")
        except MaxRetryError as e:
            err = e
        else:
            err = None

        # commonName is only valid for DNS names, not IP addresses.
        if no_san_server.host == "localhost":
            assert err is None

        # IP addresses should fail for commonName.
        else:
            assert err is not None
            assert type(err.reason) is SSLError
            assert isinstance(
                err.reason.args[0], (ssl.SSLCertVerificationError, CertificateError)
            )

    def test_assert_hostname_invalid_san(
        self, no_localhost_san_server: ServerConfig
    ) -> None:
        """Ensure SAN errors are not raised while assert_hostname is false"""
        with HTTPSConnectionPool(
            no_localhost_san_server.host,
            no_localhost_san_server.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=no_localhost_san_server.ca_certs,
            assert_hostname=False,
        ) as https_pool:
            https_pool.request("GET", "/")

    def test_assert_hostname_invalid_cn(
        self, no_san_server_with_different_commmon_name: ServerConfig
    ) -> None:
        """Ensure CN errors are not raised while assert_hostname is false"""
        ctx = urllib3.util.ssl_.create_urllib3_context(verify_flags=0)
        with HTTPSConnectionPool(
            no_san_server_with_different_commmon_name.host,
            no_san_server_with_different_commmon_name.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=no_san_server_with_different_commmon_name.ca_certs,
            ssl_context=ctx,
            assert_hostname=False,
        ) as https_pool:
            https_pool.request("GET", "/")


class TestHTTPS_IPV4SAN:
    def test_can_validate_ip_san(self, ipv4_san_server: ServerConfig) -> None:
        """Ensure that urllib3 can validate SANs with IP addresses in them."""
        with HTTPSConnectionPool(
            ipv4_san_server.host,
            ipv4_san_server.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=ipv4_san_server.ca_certs,
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200


class TestHTTPS_IPV6SAN:
    @pytest.mark.parametrize("host", ["::1", "[::1]"])
    def test_can_validate_ipv6_san(
        self, ipv6_san_server: ServerConfig, host: str, http_version: str
    ) -> None:
        """Ensure that urllib3 can validate SANs with IPv6 addresses in them."""
        with HTTPSConnectionPool(
            host,
            ipv6_san_server.port,
            cert_reqs="CERT_REQUIRED",
            ca_certs=ipv6_san_server.ca_certs,
        ) as https_pool:
            r = https_pool.request("GET", "/")
            assert r.status == 200
            assert r.headers["server"] == f"hypercorn-{http_version}"
