1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright Amazon.com Inc. or its affiliates
#
import typing
import boto3
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
AsymmetricSignatureContext,
utils as asym_utils,
)
from cryptography.hazmat.primitives.asymmetric.padding import (
AsymmetricPadding,
PKCS1v15,
PSS,
)
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey,
RSAPrivateNumbers,
RSAPublicKey,
)
class _RSAPrivateKeyInKMS(RSAPrivateKey):
def __init__(self, arn):
self.arn = arn
self.client = boto3.client('kms')
response = self.client.get_public_key(KeyId=self.arn)
# Parse public key
self.public_key = serialization.load_der_public_key(
response['PublicKey'])
@property
def key_size(self):
return self.public_key.key_size
def public_key(self) -> RSAPublicKey:
return self.public_key
def sign(self, data: bytes, padding: AsymmetricPadding,
algorithm: typing.Union[asym_utils.Prehashed,
hashes.HashAlgorithm]
) -> bytes:
if isinstance(algorithm, asym_utils.Prehashed):
message_type = 'DIGEST'
else:
message_type = 'RAW'
if isinstance(padding, PSS):
signing_alg = 'RSASSA_PSS_'
elif isinstance(padding, PKCS1v15):
signing_alg = 'RSASSA_PKCS1_V1_5_'
else:
raise TypeError("Unsupported padding")
if (isinstance(algorithm._algorithm, hashes.SHA256) or
isinstance(algorithm, hashes.SHA256)):
signing_alg += 'SHA_256'
elif (isinstance(algorithm._algorithm, hashes.SHA384) or
isinstance(algorithm, hashes.SHA384)):
signing_alg += 'SHA_384'
elif (isinstance(algorithm._algorithm, hashes.SHA512) or
isinstance(algorithm, hashes.SHA512)):
signing_alg += 'SHA_512'
else:
raise TypeError("Unsupported hashing algorithm")
response = self.client.sign(
KeyId=self.arn, Message=data,
MessageType=message_type,
SigningAlgorithm=signing_alg)
return response['Signature']
# No need to implement these functions so we raise an exception
def signer(
self, padding: AsymmetricPadding, algorithm: hashes.HashAlgorithm
) -> AsymmetricSignatureContext:
raise NotImplementedError
def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
raise NotImplementedError
def private_numbers(self) -> RSAPrivateNumbers:
raise NotImplementedError
def private_bytes(
self,
encoding: serialization.Encoding,
format: serialization.PrivateFormat,
encryption_algorithm: serialization.KeySerializationEncryption
) -> bytes:
raise NotImplementedError
|