File: test_x509_adapter.py

package info (click to toggle)
python-requests-toolbelt 1.0.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 876 kB
  • sloc: python: 3,653; makefile: 166; sh: 7
file content (83 lines) | stat: -rw-r--r-- 3,248 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
import requests
import unittest
import pytest

try:
    import OpenSSL
except ImportError:
    PYOPENSSL_AVAILABLE = False
else:
    PYOPENSSL_AVAILABLE = True
    from requests_toolbelt.adapters.x509 import X509Adapter
    from cryptography import x509
    from cryptography.hazmat.primitives.serialization import (
        Encoding,
        PrivateFormat,
        BestAvailableEncryption,
        load_pem_private_key,
    )
    import trustme

from requests_toolbelt import exceptions as exc
from . import get_betamax

REQUESTS_SUPPORTS_SSL_CONTEXT = requests.__build__ >= 0x021200

pytestmark = pytest.mark.filterwarnings(
    "ignore:'urllib3.contrib.pyopenssl' module is deprecated:DeprecationWarning")


class TestX509Adapter(unittest.TestCase):
    """Tests a simple requests.get() call using a .p12 cert.
    """
    def setUp(self):
        self.pkcs12_password_bytes = "test".encode('utf8')
        self.session = requests.Session()

    @pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
                        reason="Requires Requests v2.12.0 or later")
    @pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
                        reason="Requires OpenSSL")
    def test_x509_pem(self):
        ca = trustme.CA()
        cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal')
        cert_bytes = cert.cert_chain_pems[0].bytes()
        pk_bytes = cert.private_key_pem.bytes()

        adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes)
        self.session.mount('https://', adapter)
        recorder = get_betamax(self.session)
        with recorder.use_cassette('test_x509_adapter_pem'):
            r = self.session.get('https://pkiprojecttest01.dev.labs.internal/', verify=False)

        assert r.status_code == 200
        assert r.text

    @pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
                    reason="Requires Requests v2.12.0 or later")
    @pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
                    reason="Requires OpenSSL")
    def test_x509_der_and_password(self):
        ca = trustme.CA()
        cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal')
        cert_bytes = x509.load_pem_x509_certificate(
            cert.cert_chain_pems[0].bytes()).public_bytes(Encoding.DER)
        pem_pk = load_pem_private_key(cert.private_key_pem.bytes(), password=None)
        pk_bytes = pem_pk.private_bytes(Encoding.DER, PrivateFormat.PKCS8,
                                        BestAvailableEncryption(self.pkcs12_password_bytes))

        adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes,
                              password=self.pkcs12_password_bytes, encoding=Encoding.DER)
        self.session.mount('https://', adapter)
        recorder = get_betamax(self.session)
        with recorder.use_cassette('test_x509_adapter_der'):
            r = self.session.get('https://pkiprojecttest01.dev.labs.internal/', verify=False)

        assert r.status_code == 200
        assert r.text

    @pytest.mark.skipif(REQUESTS_SUPPORTS_SSL_CONTEXT, reason="Will not raise exc")
    def test_requires_new_enough_requests(self):
        with pytest.raises(exc.VersionMismatchError):
            X509Adapter()