File: cert_fetch_client.py

package info (click to toggle)
python-pyhanko-certvalidator 0.26.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,956 kB
  • sloc: python: 9,254; sh: 47; makefile: 4
file content (148 lines) | stat: -rw-r--r-- 4,955 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import logging
from typing import Iterable, Union

import aiohttp
from asn1crypto import cms, x509

from ...errors import CertificateFetchError
from ..api import CertificateFetcher
from ..common_utils import (
    ACCEPTABLE_CERT_DER_ALIASES,
    ACCEPTABLE_CERT_PEM_ALIASES,
    ACCEPTABLE_PKCS7_DER_ALIASES,
    ACCEPTABLE_STRICT_CERT_CONTENT_TYPES,
    complete_certificate_fetch_jobs,
    gather_aia_issuer_urls,
    unpack_cert_content,
)
from .util import AIOHttpMixin, LazySession

logger = logging.getLogger(__name__)


class AIOHttpCertificateFetcher(CertificateFetcher, AIOHttpMixin):
    def __init__(
        self,
        session: Union[aiohttp.ClientSession, LazySession],
        user_agent=None,
        per_request_timeout=10,
        permit_pem=True,
    ):
        super().__init__(session, user_agent, per_request_timeout)
        self.permit_pem = permit_pem

    async def fetch_certs(self, url, url_origin_type):
        """
        Fetch one or more certificates from a URL.

        :param url:
            URL to fetch.
        :param url_origin_type:
            Parameter indicating where the URL came from (e.g. 'CRL'),
            for error reporting purposes.
        :raises:
            CertificateFetchError - when a network I/O or decoding error occurs
        :return:
            An iterable of asn1crypto.x509.Certificate objects.
        """

        async def task():
            try:
                logger.info(f"Fetching certificates from {url}...")
                return await _grab_certs(
                    url,
                    permit_pem=self.permit_pem,
                    timeout=self.per_request_timeout,
                    user_agent=self.user_agent,
                    session=await self.get_session(),
                    url_origin_type=url_origin_type,
                )
            except (ValueError, aiohttp.ClientError) as e:
                msg = f"Failed to fetch certificate(s) from url {url}."
                logger.debug(msg, exc_info=e)
                raise CertificateFetchError(msg)

        return await self._post_fetch_task(url, task)

    def fetch_cert_issuers(
        self, cert: Union[x509.Certificate, cms.AttributeCertificateV2]
    ):
        fetch_jobs = [
            self.fetch_certs(url, url_origin_type='certificate')
            for url in gather_aia_issuer_urls(cert)
        ]

        if isinstance(cert, x509.Certificate):
            target = cert.subject.human_friendly
        else:
            # TODO log audit ID
            target = "attribute certificate"
        logger.info(f"Retrieving issuer certs for {target}...")
        return complete_certificate_fetch_jobs(fetch_jobs)

    def fetch_crl_issuers(self, certificate_list):
        fetch_jobs = [
            self.fetch_certs(url, url_origin_type='CRL')
            for url in certificate_list.issuer_cert_urls
        ]
        return complete_certificate_fetch_jobs(fetch_jobs)

    def fetched_certs(self) -> Iterable[x509.Certificate]:
        return self.get_results()


async def _grab_certs(
    url,
    *,
    user_agent,
    session: aiohttp.ClientSession,
    url_origin_type,
    timeout,
    permit_pem=True,
):
    """
    Grab one or more certificates from a caIssuers URL.

    We accept two types of content in the response:
      - A single DER-encoded X.509 certificate
      - A PKCS#7 'certs-only' SignedData message
      - PEM-encoded certificates (if permit_pem=True)

    Note: strictly speaking, you're not supposed to use PEM to serve certs for
    AIA purposes in PEM format, but people do it anyway.
    """

    if permit_pem:
        acceptable_cts = (
            ACCEPTABLE_STRICT_CERT_CONTENT_TYPES
            | ACCEPTABLE_CERT_PEM_ALIASES
            | ACCEPTABLE_CERT_DER_ALIASES
            | ACCEPTABLE_PKCS7_DER_ALIASES
        )
    else:
        acceptable_cts = ACCEPTABLE_STRICT_CERT_CONTENT_TYPES

    headers = {'Accept': ','.join(acceptable_cts), 'User-Agent': user_agent}
    cl_timeout = aiohttp.ClientTimeout(timeout)
    async with session.get(
        url=url, headers=headers, timeout=cl_timeout, raise_for_status=True
    ) as response:
        response_data = await response.read()
        try:
            content_type = response.headers['Content-Type'].strip()
            if content_type not in acceptable_cts:
                ct_err = (
                    f"Unacceptable content type '{repr(content_type)}' "
                    f"when fetching issuer certificate for {url_origin_type} "
                    f"from URL {url}."
                )
                raise aiohttp.ContentTypeError(
                    response.request_info,
                    response.history,
                    message=ct_err,
                    headers=response.headers,
                )
        except KeyError:
            content_type = None
    certs = unpack_cert_content(response_data, content_type, url, permit_pem)
    return list(certs)