1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Support certificate validation."""
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509, exceptions as cryptography_exceptions
from oslo_log import log as logging
from oslo_utils import timeutils
from cursive import exception
from cursive import signature_utils
from cursive import verifiers
LOG = logging.getLogger(__name__)
def is_within_valid_dates(certificate):
"""Determine if the certificate is outside its valid date range.
:param certificate: the cryptography certificate object
:return: False if the certificate valid time range does not include
now, True otherwise.
"""
# Get now in UTC, since certificate returns times in UTC
now = timeutils.utcnow()
# Confirm the certificate valid time range includes now
if now < certificate.not_valid_before:
return False
elif now > certificate.not_valid_after:
return False
return True
def is_issuer(issuing_certificate, issued_certificate):
"""Determine if the issuing cert is the parent of the issued cert.
Determine if the issuing certificate is the parent of the issued
certificate by:
* conducting subject and issuer name matching, and
* verifying the signature of the issued certificate with the issuing
certificate's public key
:param issuing_certificate: the cryptography certificate object that
is the potential parent of the issued certificate
:param issued_certificate: the cryptography certificate object that
is the potential child of the issuing certificate
:return: True if the issuing certificate is the parent of the issued
certificate, False otherwise.
"""
if (issuing_certificate is None) or (issued_certificate is None):
return False
elif issuing_certificate.subject != issued_certificate.issuer:
return False
else:
try:
verify_certificate_signature(
issuing_certificate,
issued_certificate
)
except cryptography_exceptions.InvalidSignature:
# If verification fails, an exception is expected.
return False
return True
def can_sign_certificates(certificate, certificate_uuid=''):
"""Determine if the certificate can sign other certificates.
:param certificate: the cryptography certificate object
:param certificate_uuid: the uuid of the certificate
:return: False if the certificate cannot sign other certificates,
True otherwise.
"""
try:
basic_constraints = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.BASIC_CONSTRAINTS
).value
except x509.extensions.ExtensionNotFound:
LOG.debug(
"Certificate '%s' does not have a basic constraints extension.",
certificate_uuid)
return False
try:
key_usage = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.KEY_USAGE
).value
except x509.extensions.ExtensionNotFound:
LOG.debug(
"Certificate '%s' does not have a key usage extension.",
certificate_uuid)
return False
if basic_constraints.ca and key_usage.key_cert_sign:
return True
if not basic_constraints.ca:
LOG.debug(
"Certificate '%s' is not marked as a CA in its basic constraints "
"extension.",
certificate_uuid)
if not key_usage.key_cert_sign:
LOG.debug(
"Certificate '%s' is not marked for verifying certificate "
"signatures in its key usage extension.",
certificate_uuid)
return False
def verify_certificate_signature(signing_certificate, certificate):
"""Verify that the certificate was signed correctly.
:param signing_certificate: the cryptography certificate object used to
sign the certificate
:param certificate: the cryptography certificate object that was signed
by the signing certificate
:raises: cryptography.exceptions.InvalidSignature if certificate signature
verification fails.
"""
signature_hash_algorithm = certificate.signature_hash_algorithm
signature_bytes = certificate.signature
signer_public_key = signing_certificate.public_key()
if isinstance(signer_public_key, rsa.RSAPublicKey):
verifier = verifiers.RSAVerifier(
signature_bytes, signature_hash_algorithm,
signer_public_key, padding.PKCS1v15(),
)
elif isinstance(signer_public_key, ec.EllipticCurvePublicKey):
verifier = verifiers.ECCVerifier(
signature_bytes, signature_hash_algorithm,
signer_public_key,
)
else:
verifier = verifiers.DSAVerifier(
signature_bytes, signature_hash_algorithm,
signer_public_key,
)
verifier.update(certificate.tbs_certificate_bytes)
verifier.verify()
def verify_certificate(context, certificate_uuid,
trusted_certificate_uuids,
enforce_valid_dates=True,
enforce_signing_extensions=True,
enforce_path_length=True):
"""Validate a certificate against a set of trusted certificates.
From the key manager, load the set of trusted certificates and the
certificate to validate. Store the trusted certificates in a certificate
verification context. Use the context to verify that the certificate is
cryptographically linked to at least one of the trusted certificates.
:param context: the user context for authentication
:param certificate_uuid: the uuid of a certificate to validate, stored in
the key manager
:param trusted_certificate_uuids: a list containing the uuids of trusted
certificates stored in the key manager
:param enforce_valid_dates: a boolean indicating whether date checking
should be enforced during certificate verification, defaults to
True
:param enforce_signing_extensions: a boolean indicating whether extension
checking should be enforced during certificate verification,
defaults to True
:param enforce_path_length: a boolean indicating whether path length
constraints should be enforced during certificate verification,
defaults to True
:raises: SignatureVerificationError if the certificate verification fails
for any reason.
"""
trusted_certificates = list()
for uuid in trusted_certificate_uuids:
try:
trusted_certificates.append(
(uuid, signature_utils.get_certificate(context, uuid))
)
except exception.SignatureVerificationError:
LOG.warning("Skipping trusted certificate: %(id)s" % {'id': uuid})
certificate = signature_utils.get_certificate(context, certificate_uuid)
certificate_context = CertificateVerificationContext(
trusted_certificates,
enforce_valid_dates=enforce_valid_dates,
enforce_signing_extensions=enforce_signing_extensions,
enforce_path_length=enforce_path_length
)
certificate_context.update(certificate)
certificate_context.verify()
class CertificateVerificationContext(object):
"""A collection of signing certificates.
A collection of signing certificates that may be used to verify the
signatures of other certificates.
"""
def __init__(self, certificate_tuples, enforce_valid_dates=True,
enforce_signing_extensions=True,
enforce_path_length=True):
self._signing_certificates = []
for certificate_tuple in certificate_tuples:
certificate_uuid, certificate = certificate_tuple
if not isinstance(certificate, x509.Certificate):
LOG.error(
"A signing certificate must be an x509.Certificate object."
)
continue
if enforce_valid_dates:
if not is_within_valid_dates(certificate):
LOG.warning(
"Certificate '%s' is outside its valid date range and "
"cannot be used as a signing certificate.",
certificate_uuid)
continue
if enforce_signing_extensions:
if not can_sign_certificates(certificate, certificate_uuid):
LOG.warning(
"Certificate '%s' is not configured to act as a "
"signing certificate. It will not be used as a "
"signing certificate.",
certificate_uuid)
continue
self._signing_certificates.append(certificate_tuple)
self._signed_certificate = None
self._enforce_valid_dates = enforce_valid_dates
self._enforce_path_length = enforce_path_length
def update(self, certificate):
"""Process the certificate to be verified.
Raises an exception if the certificate is invalid. Stores it
otherwise.
:param certificate: the cryptography certificate to be verified
:raises: SignatureVerificationError if the certificate is not of the
right type or if it is outside its valid date range.
"""
if not isinstance(certificate, x509.Certificate):
raise exception.SignatureVerificationError(
"The certificate must be an x509.Certificate object."
)
if self._enforce_valid_dates:
if not is_within_valid_dates(certificate):
raise exception.SignatureVerificationError(
"The certificate is outside its valid date range."
)
self._signed_certificate = certificate
def verify(self):
"""Locate the certificate's signing certificate and verify it.
Locate the certificate's signing certificate in the context
certificate cache, using both subject/issuer name matching and
signature verification. If the certificate is self-signed, verify that
it is also located in the context's certificate cache. Construct the
certificate chain from certificates in the context certificate cache.
Verify that the signing certificate can have a sufficient number of
child certificates to support the chain.
:raises: SignatureVerificationError if certificate validation fails
for any reason, including mismatched signatures or a failure
to find the required signing certificate.
"""
signed_certificate = self._signed_certificate
certificate_chain = [('base', signed_certificate)]
# Build the certificate chain.
while True:
signing_certificate_tuple = None
# Search for the signing certificate
for certificate_tuple in self._signing_certificates:
_, candidate = certificate_tuple
if is_issuer(candidate, signed_certificate):
signing_certificate_tuple = certificate_tuple
break
# If a valid signing certificate is found, prepare to find the
# next link in the certificate chain. Otherwise, raise an error.
if signing_certificate_tuple:
# If the certificate is self-signed, the root of the
# certificate chain has been found. Otherwise, repeat the
# verification process using the newly found signing
# certificate.
if signed_certificate == signing_certificate_tuple[1]:
break
else:
certificate_chain.insert(0, signing_certificate_tuple)
signed_certificate = signing_certificate_tuple[1]
else:
uuid = certificate_chain[0][0]
raise exception.SignatureVerificationError(
"Certificate chain building failed. Could not locate the "
"signing certificate for %s in the set of trusted "
"certificates." %
"the base certificate" if uuid == 'base'
else "certificate '%s'" % uuid
)
if self._enforce_path_length:
# Verify that each certificate's path length constraint allows
# for it to support the rest of the certificate chain.
for i in range(len(certificate_chain)):
certificate = certificate_chain[i][1]
# No need to check the last certificate in the chain.
if certificate == certificate_chain[-1][1]:
break
try:
constraints = certificate.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.BASIC_CONSTRAINTS
).value
except x509.extensions.ExtensionNotFound:
raise exception.SignatureVerificationError(
"Certificate validation failed. The signing "
"certificate '%s' does not have a basic constraints "
"extension." % certificate_chain[i][0]
)
# Path length only applies to non-self-issued intermediate
# certificates. Do not include the current or end certificates
# when computing path length.
chain_length = len(certificate_chain[i:])
chain_length = (chain_length - 2) if chain_length > 2 else 0
if constraints.path_length < chain_length:
raise exception.SignatureVerificationError(
"Certificate validation failed. The signing "
"certificate '%s' is not configured to support "
"certificate chains of sufficient "
"length." % certificate_chain[i][0]
)
|