1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
import logging
from typing import Iterable, Union
import requests
from asn1crypto import cms, x509
from ...errors import CertificateFetchError
from ..api import CertificateFetcher
from ..common_utils import (
ACCEPTABLE_CERT_DER_ALIASES,
ACCEPTABLE_CERT_PEM_ALIASES,
ACCEPTABLE_PKCS7_DER_ALIASES,
ACCEPTABLE_STRICT_CERT_CONTENT_TYPES,
complete_certificate_fetch_jobs,
gather_aia_issuer_urls,
unpack_cert_content,
)
from .util import RequestsFetcherMixin
logger = logging.getLogger(__name__)
class RequestsCertificateFetcher(CertificateFetcher, RequestsFetcherMixin):
"""
Implementation of async CertificateFetcher API using requests, for backwards
compatibility. This class does not require resource management.
"""
def __init__(
self, user_agent=None, per_request_timeout=10, permit_pem=True
):
super().__init__(user_agent, per_request_timeout)
self.permit_pem = permit_pem
async def fetch_certs(self, url, url_origin_type):
"""
Fetch one or more certificates from a URL.
:param url:
URL to fetch.
:param url_origin_type:
Parameter indicating where the URL came from (e.g. 'CRL'),
for error reporting purposes.
:raises:
CertificateFetchError - when a network I/O or decoding error occurs
:return:
An iterable of asn1crypto.x509.Certificate objects.
"""
async def task():
try:
logger.info(f"Fetching certificates from {url}...")
results = await self._grab_certs(
url, url_origin_type=url_origin_type
)
except (ValueError, requests.RequestException) as e:
msg = f"Failed to fetch certificate(s) from url {url}."
logger.debug(msg, exc_info=e)
raise CertificateFetchError(msg)
return results
return await self._perform_fetch(url, task)
def fetch_cert_issuers(
self, cert: Union[x509.Certificate, cms.AttributeCertificateV2]
):
fetch_jobs = [
self.fetch_certs(url, url_origin_type='certificate')
for url in gather_aia_issuer_urls(cert)
]
if isinstance(cert, x509.Certificate):
target = cert.subject.human_friendly
else:
# TODO log audit ID
target = "attribute certificate"
logger.info(f"Retrieving issuer certs for {target}...")
return complete_certificate_fetch_jobs(fetch_jobs)
def fetch_crl_issuers(self, certificate_list):
fetch_jobs = [
self.fetch_certs(url, url_origin_type='CRL')
for url in certificate_list.issuer_cert_urls
]
return complete_certificate_fetch_jobs(fetch_jobs)
def fetched_certs(self) -> Iterable[x509.Certificate]:
return self.get_results()
async def _grab_certs(self, url, *, url_origin_type):
"""
Grab one or more certificates from a caIssuers URL.
We accept two types of content in the response:
- A single DER-encoded X.509 certificate
- A PKCS#7 'certs-only' SignedData message
- PEM-encoded certificates (if permit_pem=True)
Note: strictly speaking, you're not supposed to use PEM to serve certs
for AIA purposes in PEM format, but people do it anyway.
"""
permit_pem = self.permit_pem
if permit_pem:
acceptable_cts = (
ACCEPTABLE_STRICT_CERT_CONTENT_TYPES
| ACCEPTABLE_CERT_PEM_ALIASES
| ACCEPTABLE_CERT_DER_ALIASES
| ACCEPTABLE_PKCS7_DER_ALIASES
)
else:
acceptable_cts = ACCEPTABLE_STRICT_CERT_CONTENT_TYPES
response = await self._get(url, acceptable_content_types=acceptable_cts)
try:
content_type = response.headers['Content-Type'].strip()
if content_type not in acceptable_cts:
ct_err = (
f"Unacceptable content type '{repr(content_type)}' "
f"when fetching issuer certificate for {url_origin_type} "
f"from URL {url}."
)
raise requests.RequestException(ct_err)
except KeyError:
content_type = None
certs = unpack_cert_content(
response.content, content_type, url, permit_pem
)
return list(certs)
|