File: http_message_security.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (192 lines) | stat: -rw-r--r-- 8,604 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#---------------------------------------------------------------------------------------------

import json
import time
import os
from ._internal import _a128cbc_hs256_encrypt, _a128cbc_hs256_decrypt, _JwsHeader, _JwsObject, \
    _JweHeader, _JweObject, _str_to_b64url, _bstr_to_b64url, _b64_to_bstr, _RsaKey


def generate_pop_key():
    """
    Generates a key which can be used for Proof Of Possession token authentication.
    :return:
    """
    return _RsaKey.generate()


class HttpMessageSecurity(object):
    """
    Used for message authorization, encryption and decrtyption.

    This class is intended for internal use only.  Details are subject to non-compatible changes, consumers of the
    azure-keyvault module should not take dependencies on this class or its current implementation.
    """
    def __init__(self, client_security_token=None,
                 client_signature_key=None,
                 client_encryption_key=None,
                 server_signature_key=None,
                 server_encryption_key=None):
        self.client_security_token = client_security_token
        self.client_signature_key = client_signature_key
        self.client_encryption_key = client_encryption_key
        self.server_signature_key = server_signature_key
        self.server_encryption_key = server_encryption_key

    def protect_request(self, request):
        """
        Adds authorization header, and encrypts and signs the request if supported on the specific request.
        :param request: unprotected request to apply security protocol
        :return: protected request with appropriate security protocal applied
        """
        # Setup the auth header on the request
        # Due to limitations in the service we hard code the auth scheme to 'Bearer' as the service will fail with any
        # other scheme or a different casing such as 'bearer', once this is fixed the following line should be replaced:
        # request.headers['Authorization'] = '{} {}'.format(auth[0], auth[1])
        request.headers['Authorization'] = '{} {}'.format('Bearer', self.client_security_token)

        # if the current message security doesn't support message protection, or the body is empty
        # skip protection and return the original request
        if not self.supports_protection() or len(request.body) == 0:
            return request

        plain_text = request.body

        # if the client encryption key is specified add it to the body of the request
        if self.client_encryption_key:
            # note that this assumes that the body is already json and not simple string content
            # this is true for all requests which currently support message encryption, but might
            # need to be revisited when the types of
            body_dict = json.loads(plain_text)
            body_dict['rek'] = {'jwk': self.client_encryption_key.to_jwk().serialize()}
            plain_text = json.dumps(body_dict).encode(encoding='utf8')

        # build the header for the jws body
        jws_header = _JwsHeader()
        jws_header.alg = 'RS256'
        jws_header.kid = self.client_signature_key.kid
        jws_header.at = self.client_security_token
        jws_header.ts = int(time.time())
        jws_header.typ = 'PoP'

        jws = _JwsObject()

        jws.protected = jws_header.to_compact_header()
        jws.payload = self._protect_payload(plain_text)
        data = (jws.protected + '.' + jws.payload).encode('ascii')
        jws.signature = _bstr_to_b64url(self.client_signature_key.sign(data))

        request.headers['Content-Type'] = 'application/jose+json'

        request.prepare_body(data=jws.to_flattened_jws(), files=None)

        return request

    def unprotect_response(self, response, **kwargs):
        """
        Removes protection from the specified response
        :param request: response from the key vault service
        :return: unprotected response with any security protocal encryption removed
        """
        body = response.content
        # if the current message security doesn't support message protection, the body is empty, or the request failed
        # skip protection and return the original response
        if not self.supports_protection() or len(response.content) == 0 or response.status_code != 200:
            return response

        # ensure the content-type is application/jose+json
        if 'application/jose+json' not in response.headers.get('content-type', '').lower():
            raise ValueError('Invalid protected response')

        # deserialize the response into a JwsObject, using response.text so requests handles the encoding
        jws = _JwsObject().deserialize(body)

        # deserialize the protected header
        jws_header = _JwsHeader.from_compact_header(jws.protected)

        # ensure the jws signature kid matches the key from original challenge
        # and the alg matches expected signature alg
        if jws_header.kid != self.server_signature_key.kid \
                or jws_header.alg != 'RS256':
            raise ValueError('Invalid protected response')

        # validate the signature of the jws
        data = (jws.protected + '.' + jws.payload).encode('ascii')
        # verify will raise an InvalidSignature exception if the signature doesn't match
        self.server_signature_key.verify(signature=_b64_to_bstr(jws.signature), data=data)

        # get the unprotected response body
        decrypted = self._unprotect_payload(jws.payload)

        response._content = decrypted
        response.headers['Content-Type'] = 'application/json'

        return response

    def supports_protection(self):
        """
        Determines if the the current HttpMessageSecurity object supports the message protection protocol.
        :return: True if the current object supports protection, otherwise False
        """
        return self.client_signature_key \
               and self.client_encryption_key \
               and self.server_signature_key \
               and self.server_encryption_key

    def _protect_payload(self, plaintext):
        # create the jwe header for the payload
        kek = self.server_encryption_key
        jwe_header = _JweHeader()
        jwe_header.alg = 'RSA-OAEP'
        jwe_header.kid = kek.kid
        jwe_header.enc = 'A128CBC-HS256'

        # create the jwe object
        jwe = _JweObject()
        jwe.protected = jwe_header.to_compact_header()

        # generate the content encryption key and iv
        cek = os.urandom(32)
        iv = os.urandom(16)
        jwe.iv = _bstr_to_b64url(iv)
        # wrap the cek using the server encryption key
        wrapped = _bstr_to_b64url(kek.encrypt(cek))
        jwe.encrypted_key = wrapped

        # encrypt the plaintext body with the cek using the protected header
        # as the authdata to get the ciphertext and the authtag
        ciphertext, tag = _a128cbc_hs256_encrypt(cek, iv, plaintext, jwe.protected.encode('ascii'))

        jwe.ciphertext = _bstr_to_b64url(ciphertext)
        jwe.tag = _bstr_to_b64url(tag)

        # flatten and encode the jwe for the final jws payload content
        flat = jwe.to_flattened_jwe()
        return _str_to_b64url(flat)

    def _unprotect_payload(self, payload):
        # deserialize the payload
        jwe = _JweObject().deserialize_b64(payload)

        # deserialize the payload header
        jwe_header = _JweHeader.from_compact_header(jwe.protected)

        # ensure the kid matches the specified client encryption key
        # and the key wrap alg and the data encryption enc match the expected
        if self.client_encryption_key.kid != jwe_header.kid \
                or jwe_header.alg != 'RSA-OAEP' \
                or jwe_header.enc != 'A128CBC-HS256':
            raise ValueError('Invalid protected response')

        # unwrap the cek using the client encryption key
        cek = self.client_encryption_key.decrypt(_b64_to_bstr(jwe.encrypted_key))

        # decrypt the cipher text to get the unprotected body content
        return _a128cbc_hs256_decrypt(cek,
                                      _b64_to_bstr(jwe.iv),
                                      _b64_to_bstr(jwe.ciphertext),
                                      jwe.protected.encode('ascii'),
                                      _b64_to_bstr(jwe.tag))