File: test_https.py

package info (click to toggle)
python-httplib2 0.22.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,448 kB
  • sloc: python: 6,629; javascript: 3,563; makefile: 56
file content (206 lines) | stat: -rw-r--r-- 7,748 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import platform
import socket
import ssl
import sys

import pytest
from six.moves import urllib

import httplib2
import tests


def test_get_via_https():
    # Test that we can handle HTTPS
    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_const_http(tls=True) as uri:
        response, _ = http.request(uri, "GET")
        assert response.status == 200


def test_get_301_via_https():
    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    glocation = [""]  # nonlocal kind of trick, maybe redundant

    def handler(request):
        if request.uri == "/final":
            return tests.http_response_bytes(body=b"final")
        return tests.http_response_bytes(status="301 goto", headers={"location": glocation[0]})

    with tests.server_request(handler, request_count=2, tls=True) as uri:
        glocation[0] = urllib.parse.urljoin(uri, "/final")
        response, content = http.request(uri, "GET")
        assert response.status == 200
        assert content == b"final"
        assert response.previous.status == 301
        assert response.previous["location"] == glocation[0]


def test_get_301_via_https_spec_violation_on_location():
    # Test that we follow redirects through HTTPS
    # even if they violate the spec by including
    # a relative Location: header instead of an absolute one.
    http = httplib2.Http(ca_certs=tests.CA_CERTS)

    def handler(request):
        if request.uri == "/final":
            return tests.http_response_bytes(body=b"final")
        return tests.http_response_bytes(status="301 goto", headers={"location": "/final"})

    with tests.server_request(handler, request_count=2, tls=True) as uri:
        response, content = http.request(uri, "GET")
        assert response.status == 200
        assert content == b"final"
        assert response.previous.status == 301


def test_invalid_ca_certs_path():
    http = httplib2.Http(ca_certs="/nosuchfile")
    with tests.server_const_http(request_count=0, tls=True) as uri:
        with tests.assert_raises(IOError):
            http.request(uri, "GET")


def test_not_trusted_ca():
    # Test that we get a SSLHandshakeError if we try to access
    # server using a CA cert file that doesn't contain server's CA.
    http = httplib2.Http(ca_certs=tests.CA_UNUSED_CERTS)
    with tests.server_const_http(tls=True) as uri:
        try:
            http.request(uri, "GET")
            assert False, "expected CERTIFICATE_VERIFY_FAILED"
        except ssl.SSLError as e:
            assert e.reason == "CERTIFICATE_VERIFY_FAILED"
        except httplib2.SSLHandshakeError:  # Python2
            pass


ssl_context_accept_version = (
    hasattr(tests.ssl_context(), "maximum_version")
    and hasattr(tests.ssl_context(), "minimum_version")
    and not (platform.python_implementation().lower() == "pypy" and sys.version_info < (3,))
)
tls_minmax_versions = (
    (None, "TLSv1_2", ssl.TLSVersion.TLSv1_2)
    # TODO remove pypy2 workaround `and hasattr` clause
    if ssl_context_accept_version and hasattr(ssl, "TLSVersion") else (None,)
)


@pytest.mark.skipif(not ssl_context_accept_version, reason="ssl doesn't support TLS min/max")
@pytest.mark.parametrize("attr", ("maximum_version", "minimum_version"))
@pytest.mark.parametrize("version", tls_minmax_versions)
def test_set_tls_version(attr, version):
    ctx = tests.ssl_context()
    # We expect failure on Python < 3.7 or OpenSSL < 1.1
    expect_success = hasattr(ctx, attr)
    kwargs = {"tls_" + attr: version}
    http = httplib2.Http(**kwargs)
    try:
        http.request(tests.DUMMY_HTTPS_URL)
    except RuntimeError:
        assert not expect_success
    except socket.error:
        assert expect_success


@pytest.mark.skipif(not ssl_context_accept_version, reason="ssl doesn't support TLS min/max")
def test_max_tls_version():
    http = httplib2.Http(ca_certs=tests.CA_CERTS, tls_maximum_version="TLSv1_2")
    with tests.server_const_http(tls=True) as uri:
        http.request(uri)
        _, tls_ver, _ = http.connections.popitem()[1].sock.cipher()
        assert "TLSv1.0" <= tls_ver <= "TLSv1.2"


def test_client_cert_verified():
    cert_log = []

    def setup_tls(context, server, skip_errors):
        context.load_verify_locations(cafile=tests.CA_CERTS)
        context.verify_mode = ssl.CERT_REQUIRED
        return context.wrap_socket(server, server_side=True)

    def handler(request):
        cert_log.append(request.client_sock.getpeercert())
        return tests.http_response_bytes()

    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_request(handler, tls=setup_tls) as uri:
        uri_parsed = urllib.parse.urlparse(uri)
        http.add_certificate(tests.CLIENT_PEM, tests.CLIENT_PEM, uri_parsed.netloc)
        http.request(uri)

    assert len(cert_log) == 1
    expect_serial = tests.x509_serial(tests.CLIENT_PEM)
    assert int(cert_log[0]["serialNumber"], base=16) == expect_serial


def test_client_cert_password_verified():
    cert_log = []

    def setup_tls(context, server, skip_errors):
        context.load_verify_locations(cafile=tests.CA_CERTS)
        context.verify_mode = ssl.CERT_REQUIRED
        return context.wrap_socket(server, server_side=True)

    def handler(request):
        cert_log.append(request.client_sock.getpeercert())
        return tests.http_response_bytes()

    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_request(handler, tls=setup_tls) as uri:
        uri_parsed = urllib.parse.urlparse(uri)
        http.add_certificate(tests.CLIENT_ENCRYPTED_PEM, tests.CLIENT_ENCRYPTED_PEM, uri_parsed.netloc, password="12345")
        http.request(uri)

    assert len(cert_log) == 1
    expect_serial = tests.x509_serial(tests.CLIENT_ENCRYPTED_PEM)
    assert int(cert_log[0]["serialNumber"], base=16) == expect_serial


@pytest.mark.skipif(
    not hasattr(tests.ssl_context(), "set_servername_callback"),
    reason="SSLContext.set_servername_callback is not available",
)
def test_sni_set_servername_callback():
    sni_log = []

    def setup_tls(context, server, skip_errors):
        context.set_servername_callback(lambda _sock, hostname, _context: sni_log.append(hostname))
        return context.wrap_socket(server, server_side=True)

    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_const_http(tls=setup_tls) as uri:
        uri_parsed = urllib.parse.urlparse(uri)
        http.request(uri)
        assert sni_log == [uri_parsed.hostname]


def test_http_redirect_https():
    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_const_http(tls=True) as uri_https:
        with tests.server_const_http(status=301, headers={"location": uri_https}) as uri_http:
            response, _ = http.request(uri_http, "GET")
            assert response.status == 200
            assert response["content-location"] == uri_https
            assert response.previous.status == 301
            assert response.previous["content-location"] == uri_http


def test_https_redirect_http():
    http = httplib2.Http(ca_certs=tests.CA_CERTS)
    with tests.server_const_http() as uri_http:
        with tests.server_const_http(tls=True, status=301, headers={"location": uri_http}) as uri_https:
            response, _ = http.request(uri_https, "GET")
            assert response.status == 200
            assert response["content-location"] == uri_http
            assert response.previous.status == 301
            assert response.previous["content-location"] == uri_https


def test_disable_ssl_certificate_validation():
    http = httplib2.Http(disable_ssl_certificate_validation=True)
    with tests.server_const_http(tls=True) as uri:
        response, _ = http.request(uri, "GET")
        assert response.status == 200