1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#---------------------------------------------------------------------------------------------
import json
import time
import os
from ._internal import _a128cbc_hs256_encrypt, _a128cbc_hs256_decrypt, _JwsHeader, _JwsObject, \
_JweHeader, _JweObject, _str_to_b64url, _bstr_to_b64url, _b64_to_bstr, _RsaKey
def generate_pop_key():
"""
Generates a key which can be used for Proof Of Possession token authentication.
:return:
"""
return _RsaKey.generate()
class HttpMessageSecurity(object):
"""
Used for message authorization, encryption and decrtyption.
This class is intended for internal use only. Details are subject to non-compatible changes, consumers of the
azure-keyvault module should not take dependencies on this class or its current implementation.
"""
def __init__(self, client_security_token=None,
client_signature_key=None,
client_encryption_key=None,
server_signature_key=None,
server_encryption_key=None):
self.client_security_token = client_security_token
self.client_signature_key = client_signature_key
self.client_encryption_key = client_encryption_key
self.server_signature_key = server_signature_key
self.server_encryption_key = server_encryption_key
def protect_request(self, request):
"""
Adds authorization header, and encrypts and signs the request if supported on the specific request.
:param request: unprotected request to apply security protocol
:return: protected request with appropriate security protocal applied
"""
# Setup the auth header on the request
# Due to limitations in the service we hard code the auth scheme to 'Bearer' as the service will fail with any
# other scheme or a different casing such as 'bearer', once this is fixed the following line should be replaced:
# request.headers['Authorization'] = '{} {}'.format(auth[0], auth[1])
request.headers['Authorization'] = '{} {}'.format('Bearer', self.client_security_token)
# if the current message security doesn't support message protection, or the body is empty
# skip protection and return the original request
if not self.supports_protection() or len(request.body) == 0:
return request
plain_text = request.body
# if the client encryption key is specified add it to the body of the request
if self.client_encryption_key:
# note that this assumes that the body is already json and not simple string content
# this is true for all requests which currently support message encryption, but might
# need to be revisited when the types of
body_dict = json.loads(plain_text)
body_dict['rek'] = {'jwk': self.client_encryption_key.to_jwk().serialize()}
plain_text = json.dumps(body_dict).encode(encoding='utf8')
# build the header for the jws body
jws_header = _JwsHeader()
jws_header.alg = 'RS256'
jws_header.kid = self.client_signature_key.kid
jws_header.at = self.client_security_token
jws_header.ts = int(time.time())
jws_header.typ = 'PoP'
jws = _JwsObject()
jws.protected = jws_header.to_compact_header()
jws.payload = self._protect_payload(plain_text)
data = (jws.protected + '.' + jws.payload).encode('ascii')
jws.signature = _bstr_to_b64url(self.client_signature_key.sign(data))
request.headers['Content-Type'] = 'application/jose+json'
request.prepare_body(data=jws.to_flattened_jws(), files=None)
return request
def unprotect_response(self, response, **kwargs):
"""
Removes protection from the specified response
:param request: response from the key vault service
:return: unprotected response with any security protocal encryption removed
"""
body = response.content
# if the current message security doesn't support message protection, the body is empty, or the request failed
# skip protection and return the original response
if not self.supports_protection() or len(response.content) == 0 or response.status_code != 200:
return response
# ensure the content-type is application/jose+json
if 'application/jose+json' not in response.headers.get('content-type', '').lower():
raise ValueError('Invalid protected response')
# deserialize the response into a JwsObject, using response.text so requests handles the encoding
jws = _JwsObject().deserialize(body)
# deserialize the protected header
jws_header = _JwsHeader.from_compact_header(jws.protected)
# ensure the jws signature kid matches the key from original challenge
# and the alg matches expected signature alg
if jws_header.kid != self.server_signature_key.kid \
or jws_header.alg != 'RS256':
raise ValueError('Invalid protected response')
# validate the signature of the jws
data = (jws.protected + '.' + jws.payload).encode('ascii')
# verify will raise an InvalidSignature exception if the signature doesn't match
self.server_signature_key.verify(signature=_b64_to_bstr(jws.signature), data=data)
# get the unprotected response body
decrypted = self._unprotect_payload(jws.payload)
response._content = decrypted
response.headers['Content-Type'] = 'application/json'
return response
def supports_protection(self):
"""
Determines if the the current HttpMessageSecurity object supports the message protection protocol.
:return: True if the current object supports protection, otherwise False
"""
return self.client_signature_key \
and self.client_encryption_key \
and self.server_signature_key \
and self.server_encryption_key
def _protect_payload(self, plaintext):
# create the jwe header for the payload
kek = self.server_encryption_key
jwe_header = _JweHeader()
jwe_header.alg = 'RSA-OAEP'
jwe_header.kid = kek.kid
jwe_header.enc = 'A128CBC-HS256'
# create the jwe object
jwe = _JweObject()
jwe.protected = jwe_header.to_compact_header()
# generate the content encryption key and iv
cek = os.urandom(32)
iv = os.urandom(16)
jwe.iv = _bstr_to_b64url(iv)
# wrap the cek using the server encryption key
wrapped = _bstr_to_b64url(kek.encrypt(cek))
jwe.encrypted_key = wrapped
# encrypt the plaintext body with the cek using the protected header
# as the authdata to get the ciphertext and the authtag
ciphertext, tag = _a128cbc_hs256_encrypt(cek, iv, plaintext, jwe.protected.encode('ascii'))
jwe.ciphertext = _bstr_to_b64url(ciphertext)
jwe.tag = _bstr_to_b64url(tag)
# flatten and encode the jwe for the final jws payload content
flat = jwe.to_flattened_jwe()
return _str_to_b64url(flat)
def _unprotect_payload(self, payload):
# deserialize the payload
jwe = _JweObject().deserialize_b64(payload)
# deserialize the payload header
jwe_header = _JweHeader.from_compact_header(jwe.protected)
# ensure the kid matches the specified client encryption key
# and the key wrap alg and the data encryption enc match the expected
if self.client_encryption_key.kid != jwe_header.kid \
or jwe_header.alg != 'RSA-OAEP' \
or jwe_header.enc != 'A128CBC-HS256':
raise ValueError('Invalid protected response')
# unwrap the cek using the client encryption key
cek = self.client_encryption_key.decrypt(_b64_to_bstr(jwe.encrypted_key))
# decrypt the cipher text to get the unprotected body content
return _a128cbc_hs256_decrypt(cek,
_b64_to_bstr(jwe.iv),
_b64_to_bstr(jwe.ciphertext),
jwe.protected.encode('ascii'),
_b64_to_bstr(jwe.tag))
|