1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
"""
Asynchronous API for fetching OCSP responses, CRLs and certificates.
"""
import abc
from dataclasses import dataclass
from typing import AsyncGenerator, Iterable, Union
from asn1crypto import cms, crl, ocsp, x509
from pyhanko_certvalidator.authority import Authority
from pyhanko_certvalidator.version import __version__
__all__ = [
'OCSPFetcher',
'CRLFetcher',
'CertificateFetcher',
'Fetchers',
'FetcherBackend',
'DEFAULT_USER_AGENT',
]
DEFAULT_USER_AGENT = 'pyhanko_certvalidator %s' % __version__
class OCSPFetcher(abc.ABC):
"""Utility interface to fetch and cache OCSP responses."""
async def fetch(
self,
cert: Union[x509.Certificate, cms.AttributeCertificateV2],
authority: Authority,
) -> ocsp.OCSPResponse:
"""
Fetch an OCSP response for a certificate.
:param cert:
The certificate for which an OCSP response has to be fetched.
:param authority:
The issuing authority.
:raises:
OCSPFetchError - Raised if an OCSP response could not be obtained.
:return:
An OCSP response.
"""
raise NotImplementedError
def fetched_responses(self) -> Iterable[ocsp.OCSPResponse]:
"""
Return all responses fetched by this OCSP fetcher.
"""
raise NotImplementedError
def fetched_responses_for_cert(
self, cert: Union[x509.Certificate, cms.AttributeCertificateV2]
) -> Iterable[ocsp.OCSPResponse]:
"""
Return all responses fetched by this OCSP fetcher that are relevant
to determine the revocation status of the given certificate.
"""
raise NotImplementedError
class CRLFetcher(abc.ABC):
"""Utility interface to fetch and cache CRLs."""
async def fetch(
self,
cert: Union[x509.Certificate, cms.AttributeCertificateV2],
*,
use_deltas=None,
) -> Iterable[crl.CertificateList]:
"""
Fetches the CRLs for a certificate.
:param cert:
An asn1crypto.x509.Certificate object to get the CRL for
:param use_deltas:
A boolean indicating if delta CRLs should be fetched
:raises:
CRLFetchError - when a network/IO error or decoding error occurs
:return:
An iterable of CRLs fetched.
"""
# side note: we don't want this to be a generator, because in principle,
# we always need to consider CRLs from all distribution points together
# anyway, so there's no "stream processing" to speak of.
# (this is currently not 100% efficient in the default implementation,
# see comments below)
raise NotImplementedError
def fetched_crls(self) -> Iterable[crl.CertificateList]:
"""
Return all CRLs fetched by this CRL fetcher.
"""
raise NotImplementedError
def fetched_crls_for_cert(
self, cert: Union[x509.Certificate, cms.AttributeCertificateV2]
) -> Iterable[crl.CertificateList]:
"""
Return all relevant fetched CRLs for the given certificate
:param cert:
A certificate.
:return:
An iterable of CRLs
:raise KeyError:
if no fetch operations have been performed for this certificate
"""
raise NotImplementedError
class CertificateFetcher(abc.ABC):
"""Utility interface to fetch and cache certificates."""
def fetch_cert_issuers(
self, cert: Union[x509.Certificate, cms.AttributeCertificateV2]
) -> AsyncGenerator[x509.Certificate, None]:
"""
Fetches certificates from the authority information access extension of
a certificate.
:param cert:
A certificate
:raises:
CertificateFetchError - when a network I/O or decoding error occurs
:return:
An asynchronous generator yielding asn1crypto.x509.Certificate
objects that were fetched.
"""
raise NotImplementedError
def fetch_crl_issuers(
self, certificate_list
) -> AsyncGenerator[x509.Certificate, None]:
"""
Fetches certificates from the authority information access extension of
an asn1crypto.crl.CertificateList.
:param certificate_list:
An asn1crypto.crl.CertificateList object
:raises:
CertificateFetchError - when a network I/O or decoding error occurs
:return:
An asynchronous generator yielding asn1crypto.x509.Certificate
objects that were fetched.
"""
raise NotImplementedError
def fetched_certs(self) -> Iterable[x509.Certificate]:
"""
Return all certificates retrieved by this certificate fetcher.
"""
raise NotImplementedError
@dataclass(frozen=True)
class Fetchers:
"""
Models a collection of fetchers to be used by a validation context.
The intention is that these can share resources (like a connection pool)
in a unified, controlled manner. See also :class:`.FetcherBackend`.
"""
ocsp_fetcher: OCSPFetcher
crl_fetcher: CRLFetcher
cert_fetcher: CertificateFetcher
class FetcherBackend(abc.ABC):
"""
Generic, bare-bones interface to help abstract away instantiation logic for
fetcher implementations.
Intended to operate as an asynchronous context manager, with
`async with backend_obj as fetchers: ...` putting the resulting
:class:`.Fetchers` object in to the variable named `fetchers`.
.. note::
The initialisation part of the API is necessarily synchronous,
for backwards compatibility with the old ``ValidationContext`` API.
If you need asynchronous resource management, handle it elsewhere,
or use some form of lazy resource provisioning.
Alternatively, you can pass :class:`Fetchers` objects to the validation
context yourself, and forgo use of the :class:`.FetcherBackend`
API altogether.
"""
def get_fetchers(self) -> Fetchers:
"""
Set up fetchers synchronously.
.. note::
This is a synchronous method
"""
raise NotImplementedError
async def close(self):
"""
Clean up the resources associated with this fetcher backend,
asynchronously.
"""
pass
async def __aenter__(self) -> Fetchers:
return self.get_fetchers()
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.close()
|