1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import ssl
import typing
from pathlib import Path
import certifi
import pytest
import httpx
def test_load_ssl_config():
context = httpx.create_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_verify_non_existing_file():
with pytest.raises(IOError):
context = httpx.create_ssl_context()
context.load_verify_locations(cafile="/path/to/nowhere")
def test_load_ssl_with_keylog(monkeypatch: typing.Any) -> None:
monkeypatch.setenv("SSLKEYLOGFILE", "test")
context = httpx.create_ssl_context()
assert context.keylog_filename == "test"
def test_load_ssl_config_verify_existing_file():
context = httpx.create_ssl_context()
context.load_verify_locations(capath=certifi.where())
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_verify_directory():
context = httpx.create_ssl_context()
context.load_verify_locations(capath=Path(certifi.where()).parent)
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_cert_and_key(cert_pem_file, cert_private_key_file):
context = httpx.create_ssl_context()
context.load_cert_chain(cert_pem_file, cert_private_key_file)
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
@pytest.mark.parametrize("password", [b"password", "password"])
def test_load_ssl_config_cert_and_encrypted_key(
cert_pem_file, cert_encrypted_private_key_file, password
):
context = httpx.create_ssl_context()
context.load_cert_chain(cert_pem_file, cert_encrypted_private_key_file, password)
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED
assert context.check_hostname is True
def test_load_ssl_config_cert_and_key_invalid_password(
cert_pem_file, cert_encrypted_private_key_file
):
with pytest.raises(ssl.SSLError):
context = httpx.create_ssl_context()
context.load_cert_chain(
cert_pem_file, cert_encrypted_private_key_file, "password1"
)
def test_load_ssl_config_cert_without_key_raises(cert_pem_file):
with pytest.raises(ssl.SSLError):
context = httpx.create_ssl_context()
context.load_cert_chain(cert_pem_file)
def test_load_ssl_config_no_verify():
context = httpx.create_ssl_context(verify=False)
assert context.verify_mode == ssl.VerifyMode.CERT_NONE
assert context.check_hostname is False
def test_SSLContext_with_get_request(server, cert_pem_file):
context = httpx.create_ssl_context()
context.load_verify_locations(cert_pem_file)
response = httpx.get(server.url, verify=context)
assert response.status_code == 200
def test_limits_repr():
limits = httpx.Limits(max_connections=100)
expected = (
"Limits(max_connections=100, max_keepalive_connections=None,"
" keepalive_expiry=5.0)"
)
assert repr(limits) == expected
def test_limits_eq():
limits = httpx.Limits(max_connections=100)
assert limits == httpx.Limits(max_connections=100)
def test_timeout_eq():
timeout = httpx.Timeout(timeout=5.0)
assert timeout == httpx.Timeout(timeout=5.0)
def test_timeout_all_parameters_set():
timeout = httpx.Timeout(connect=5.0, read=5.0, write=5.0, pool=5.0)
assert timeout == httpx.Timeout(timeout=5.0)
def test_timeout_from_nothing():
timeout = httpx.Timeout(None)
assert timeout.connect is None
assert timeout.read is None
assert timeout.write is None
assert timeout.pool is None
def test_timeout_from_none():
timeout = httpx.Timeout(timeout=None)
assert timeout == httpx.Timeout(None)
def test_timeout_from_one_none_value():
timeout = httpx.Timeout(None, read=None)
assert timeout == httpx.Timeout(None)
def test_timeout_from_one_value():
timeout = httpx.Timeout(None, read=5.0)
assert timeout == httpx.Timeout(timeout=(None, 5.0, None, None))
def test_timeout_from_one_value_and_default():
timeout = httpx.Timeout(5.0, pool=60.0)
assert timeout == httpx.Timeout(timeout=(5.0, 5.0, 5.0, 60.0))
def test_timeout_missing_default():
with pytest.raises(ValueError):
httpx.Timeout(pool=60.0)
def test_timeout_from_tuple():
timeout = httpx.Timeout(timeout=(5.0, 5.0, 5.0, 5.0))
assert timeout == httpx.Timeout(timeout=5.0)
def test_timeout_from_config_instance():
timeout = httpx.Timeout(timeout=5.0)
assert httpx.Timeout(timeout) == httpx.Timeout(timeout=5.0)
def test_timeout_repr():
timeout = httpx.Timeout(timeout=5.0)
assert repr(timeout) == "Timeout(timeout=5.0)"
timeout = httpx.Timeout(None, read=5.0)
assert repr(timeout) == "Timeout(connect=None, read=5.0, write=None, pool=None)"
def test_proxy_from_url():
proxy = httpx.Proxy("https://example.com")
assert str(proxy.url) == "https://example.com"
assert proxy.auth is None
assert proxy.headers == {}
assert repr(proxy) == "Proxy('https://example.com')"
def test_proxy_with_auth_from_url():
proxy = httpx.Proxy("https://username:password@example.com")
assert str(proxy.url) == "https://example.com"
assert proxy.auth == ("username", "password")
assert proxy.headers == {}
assert repr(proxy) == "Proxy('https://example.com', auth=('username', '********'))"
def test_invalid_proxy_scheme():
with pytest.raises(ValueError):
httpx.Proxy("invalid://example.com")
|