File: test_ocsp_client.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (100 lines) | stat: -rw-r--r-- 3,220 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# coding: utf-8

import os

import pytest
from asn1crypto import pem, x509

from pyhanko_certvalidator.context import ValidationContext
from pyhanko_certvalidator.errors import OCSPFetchError
from pyhanko_certvalidator.fetchers import aiohttp_fetchers, requests_fetchers
from pyhanko_certvalidator.registry import (
    CertificateRegistry,
    PathBuilder,
    SimpleTrustManager,
)
from pyhanko_certvalidator.revinfo.validate_ocsp import verify_ocsp_response

from .common import load_cert_object
from .constants import TEST_REQUEST_TIMEOUT


@pytest.mark.asyncio
async def _test_with_fetchers(fetchers):
    intermediate = load_cert_object('digicert-g5-ecc-sha384-2021-ca1.crt')

    trust_roots = [load_cert_object(os.path.join('digicert-root-g5.crt'))]
    path_builder = PathBuilder(
        registry=CertificateRegistry.build(cert_fetcher=fetchers.cert_fetcher),
        trust_manager=SimpleTrustManager.build(trust_roots=trust_roots),
    )
    paths = await path_builder.async_build_paths(intermediate)
    path = paths[0]
    authority = path.find_issuing_authority(intermediate)

    ocsp_response = await fetchers.ocsp_fetcher.fetch(intermediate, authority)
    context = ValidationContext(
        trust_roots=trust_roots, ocsps=[ocsp_response], fetchers=fetchers
    )
    await verify_ocsp_response(intermediate, path, context)


async def _test_fetch_error(fetchers):
    # a cert that doesn't have any OCSP URLs will always throw an error
    cert_file = os.path.join('testing-ca-pss', 'interm.cert.pem')
    intermediate = load_cert_object(cert_file)

    root_file = os.path.join('testing-ca-pss', 'root.cert.pem')
    root = load_cert_object(root_file)

    path_builder = PathBuilder(
        registry=CertificateRegistry.build(cert_fetcher=fetchers.cert_fetcher),
        trust_manager=SimpleTrustManager.build(trust_roots=[root]),
    )
    paths = await path_builder.async_build_paths(intermediate)
    path = paths[0]
    authority = path.find_issuing_authority(intermediate)

    async def fetch_err():
        with pytest.raises(OCSPFetchError):
            await fetchers.ocsp_fetcher.fetch(intermediate, authority)

    # trigger this twice, to make sure we get an error for both jobs
    await fetch_err()
    await fetch_err()


@pytest.mark.asyncio
async def test_fetch_ocsp_aiohttp():
    fb = aiohttp_fetchers.AIOHttpFetcherBackend(
        per_request_timeout=TEST_REQUEST_TIMEOUT
    )
    async with fb as fetchers:
        await _test_with_fetchers(fetchers)


@pytest.mark.asyncio
async def test_fetch_ocsp_err_aiohttp():
    fb = aiohttp_fetchers.AIOHttpFetcherBackend(
        per_request_timeout=TEST_REQUEST_TIMEOUT
    )
    async with fb as fetchers:
        await _test_fetch_error(fetchers)


@pytest.mark.asyncio
async def test_fetch_ocsp_requests():
    fb = requests_fetchers.RequestsFetcherBackend(
        per_request_timeout=TEST_REQUEST_TIMEOUT
    )
    fetchers = fb.get_fetchers()
    await _test_with_fetchers(fetchers)


@pytest.mark.asyncio
async def test_fetch_ocsp_err_requests():
    fb = requests_fetchers.RequestsFetcherBackend(
        per_request_timeout=TEST_REQUEST_TIMEOUT
    )
    fetchers = fb.get_fetchers()
    await _test_fetch_error(fetchers)