1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
|
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
from asn1crypto import pem, x509
from oscrypto import trust_list
from ._errors import pretty_message
from ._types import byte_cls, type_name
from .errors import PathBuildingError, DuplicateCertificateError
from .path import ValidationPath
class CertificateRegistry():
"""
Contains certificate lists used to build validation paths
"""
# A dict with keys being asn1crypto.x509.Certificate.Name.hashable byte
# string. Each value is a list of asn1crypto.x509.Certificate objects.
_subject_map = None
# A dict with keys being asn1crypto.x509.Certificate.key_identifier byte
# string. Each value is an asn1crypto.x509.Certificate object.
_key_identifier_map = None
# A dict with keys being asn1crypto.x509.Certificate.signature byte string.
# Each value is a bool - if the certificate is a CA cert.
_ca_lookup = None
def __init__(self, trust_roots=None, extra_trust_roots=None, other_certs=None):
"""
:param trust_roots:
If the operating system's trust list should not be used, instead
pass a list of byte strings containing DER or PEM-encoded X.509
certificates, or asn1crypto.x509.Certificate objects. These
certificates will be used as the trust roots for the path being
built.
:param extra_trust_roots:
If the operating system's trust list should be used, but augmented
with one or more extra certificates. This should be a list of byte
strings containing DER or PEM-encoded X.509 certificates, or
asn1crypto.x509.Certificate objects.
:param other_certs:
A list of byte strings containing DER or PEM-encoded X.509
certificates, or a list of asn1crypto.x509.Certificate objects.
These other certs are usually provided by the service/item being
validated. In SSL, these would be intermediate chain certs.
"""
if trust_roots is not None and not isinstance(trust_roots, list):
raise TypeError(pretty_message(
'''
trust_roots must be a list of byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
type_name(trust_roots)
))
if extra_trust_roots is not None and not isinstance(extra_trust_roots, list):
raise TypeError(pretty_message(
'''
extra_trust_roots must be a list of byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
type_name(extra_trust_roots)
))
if other_certs is not None and not isinstance(other_certs, list):
raise TypeError(pretty_message(
'''
other_certs must be a list of byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
type_name(other_certs)
))
if other_certs is None:
other_certs = []
else:
other_certs = self._validate_unarmor(other_certs, 'other_certs')
if trust_roots is None:
trust_roots = [e[0] for e in trust_list.get_list()]
else:
trust_roots = self._validate_unarmor(trust_roots, 'trust_roots')
if extra_trust_roots is not None:
trust_roots.extend(self._validate_unarmor(extra_trust_roots, 'extra_trust_roots'))
self._subject_map = {}
self._key_identifier_map = {}
self._ca_lookup = {}
for trust_root in trust_roots:
hashable = trust_root.subject.hashable
if hashable not in self._subject_map:
self._subject_map[hashable] = []
self._subject_map[hashable].append(trust_root)
if trust_root.key_identifier:
self._key_identifier_map[trust_root.key_identifier] = trust_root
self._ca_lookup[trust_root.signature] = True
for other_cert in other_certs:
hashable = other_cert.subject.hashable
if hashable not in self._subject_map:
self._subject_map[hashable] = []
self._subject_map[hashable].append(other_cert)
if other_cert.key_identifier:
self._key_identifier_map[other_cert.key_identifier] = other_cert
def _validate_unarmor(self, certs, var_name):
"""
Takes a list of byte strings or asn1crypto.x509.Certificates objects,
validates and loads them while unarmoring any PEM-encoded contents
:param certs:
A list of byte strings or asn1crypto.x509.Certificate objects
:param var_name:
A unicode variable name to use in any TypeError exceptions
:return:
A list of asn1crypto.x509.Certificate objects
"""
output = []
for cert in certs:
if isinstance(cert, x509.Certificate):
output.append(cert)
else:
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
%s must contain only byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
var_name,
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
output.append(x509.Certificate.load(cert))
return output
def is_ca(self, cert):
"""
Checks if a certificate is in the list of CA certs in this registry
:param cert:
An asn1crypto.x509.Certificate object
:return:
A boolean - if the certificate is in the CA list
"""
return self._ca_lookup.get(cert.signature, False)
def add_other_cert(self, cert):
"""
Allows adding an "other" cert that is obtained from doing revocation
check via OCSP or CRL, or some other method
:param cert:
An asn1crypto.x509.Certificate object or a byte string of a DER or
PEM-encoded certificate
:return:
A boolean indicating if the certificate was added - will return
False if the certificate was already present
"""
if not isinstance(cert, x509.Certificate):
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
cert must be a byte string or an instance of
asn1crypto.x509.Certificate, not %s
''',
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
cert = x509.Certificate.load(cert)
hashable = cert.subject.hashable
if hashable not in self._subject_map:
self._subject_map[hashable] = []
# Don't add the cert if we already have it
else:
serial_number = cert.serial_number
for existing_cert in self._subject_map[hashable]:
if existing_cert.serial_number == serial_number:
return False
self._subject_map[hashable].append(cert)
if cert.key_identifier:
self._key_identifier_map[cert.key_identifier] = cert
else:
self._key_identifier_map[cert.public_key.sha1] = cert
return True
def retrieve_by_key_identifier(self, key_identifier):
"""
Retrieves a cert via its key identifier
:param key_identifier:
A byte string of the key identifier
:return:
None or an asn1crypto.x509.Certificate object
"""
if not isinstance(key_identifier, byte_cls):
raise TypeError(pretty_message(
'''
key_identifier must be a byte string, not %s
''',
type_name(key_identifier)
))
return self._key_identifier_map.get(key_identifier)
def retrieve_by_name(self, name, first_certificate=None):
"""
Retrieves a list certs via their subject name
:param name:
An asn1crypto.x509.Name object
:param first_certificate:
An asn1crypto.x509.Certificate object that if found, should be
placed first in the result list
:return:
A list of asn1crypto.x509.Certificate objects
"""
if not isinstance(name, x509.Name):
raise TypeError(pretty_message(
'''
name must be an instance of asn1crypto.x509.Name, not %s
''',
type_name(name)
))
if first_certificate and not isinstance(first_certificate, x509.Certificate):
raise TypeError(pretty_message(
'''
first_certificate must be an instance of
asn1crypto.x509.Certificate, not %s
''',
type_name(first_certificate)
))
hashable = name.hashable
if hashable not in self._subject_map:
return []
certs = self._subject_map[hashable]
first = None
output = []
for cert in certs:
if first_certificate and first_certificate.sha256 == cert.sha256:
first = cert
else:
output.append(cert)
if first:
output.insert(0, first)
return output
def build_paths(self, end_entity_cert):
"""
Builds a list of ValidationPath objects from a certificate in the
operating system trust store to the end-entity certificate
:param end_entity_cert:
A byte string of a DER or PEM-encoded X.509 certificate, or an
instance of asn1crypto.x509.Certificate
:return:
A list of certvalidator.path.ValidationPath objects that represent
the possible paths from the end-entity certificate to one of the CA
certs.
"""
if not isinstance(end_entity_cert, byte_cls) and not isinstance(end_entity_cert, x509.Certificate):
raise TypeError(pretty_message(
'''
end_entity_cert must be a byte string or an instance of
asn1crypto.x509.Certificate, not %s
''',
type_name(end_entity_cert)
))
if isinstance(end_entity_cert, byte_cls):
if pem.detect(end_entity_cert):
_, _, end_entity_cert = pem.unarmor(end_entity_cert)
end_entity_cert = x509.Certificate.load(end_entity_cert)
path = ValidationPath(end_entity_cert)
paths = []
failed_paths = []
self._walk_issuers(path, paths, failed_paths)
if len(paths) == 0:
cert_name = end_entity_cert.subject.human_friendly
missing_issuer_name = failed_paths[0].first.issuer.human_friendly
raise PathBuildingError(pretty_message(
'''
Unable to build a validation path for the certificate "%s" - no
issuer matching "%s" was found
''',
cert_name,
missing_issuer_name
))
return paths
def _walk_issuers(self, path, paths, failed_paths):
"""
Recursively looks through the list of known certificates for the issuer
of the certificate specified, stopping once the certificate in question
is one contained within the CA certs list
:param path:
A ValidationPath object representing the current traversal of
possible paths
:param paths:
A list of completed ValidationPath objects. This is mutated as
results are found.
:param failed_paths:
A list of certvalidator.path.ValidationPath objects that failed due
to no matching issuer before reaching a certificate from the CA
certs list
"""
if path.first.signature in self._ca_lookup:
paths.append(path)
return
new_branches = 0
for issuer in self._possible_issuers(path.first):
try:
self._walk_issuers(path.copy().prepend(issuer), paths, failed_paths)
new_branches += 1
except (DuplicateCertificateError):
pass
if not new_branches:
failed_paths.append(path)
def _possible_issuers(self, cert):
"""
Returns a generator that will list all possible issuers for the cert
:param cert:
An asn1crypto.x509.Certificate object to find the issuer of
"""
issuer_hashable = cert.issuer.hashable
if issuer_hashable not in self._subject_map:
return
for issuer in self._subject_map[issuer_hashable]:
# Info from the authority key identifier extension can be used to
# eliminate possible options when multiple keys with the same
# subject exist, such as during a transition, or with cross-signing.
if cert.authority_key_identifier and issuer.key_identifier:
if cert.authority_key_identifier != issuer.key_identifier:
continue
elif cert.authority_issuer_serial:
if cert.authority_issuer_serial != issuer.issuer_serial:
continue
yield issuer
|