1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
from hashlib import sha256
from six import string_types
from twilio.jwt import Jwt
class ClientValidationJwt(Jwt):
"""A JWT included on requests so that Twilio can verify request authenticity"""
__CTY = 'twilio-pkrv;v=1'
def __init__(self, account_sid, api_key_sid, credential_sid, private_key, validation_payload):
"""
Create a new ClientValidationJwt
:param str account_sid: A Twilio Account Sid starting with 'AC'
:param str api_key_sid: A Twilio API Key Sid starting with 'SK'
:param str credential_sid: A Credential Sid starting with 'CR',
public key Twilio will use to verify the JWT.
:param str private_key: The private key used to sign the JWT.
:param ValidationPayload validation_payload: information from the request to sign
"""
super(ClientValidationJwt, self).__init__(
secret_key=private_key,
issuer=api_key_sid,
subject=account_sid,
algorithm='RS256',
ttl=300 # 5 minute ttl
)
self.credential_sid = credential_sid
self.validation_payload = validation_payload
def _generate_headers(self):
return {
'cty': ClientValidationJwt.__CTY,
'kid': self.credential_sid
}
def _generate_payload(self):
# Lowercase header keys, combine and sort headers with list values
all_headers = {k.lower(): self._sort_and_join(v, ',') for k, v in self.validation_payload.all_headers.items()}
# Names of headers we are signing in the jwt
signed_headers = sorted(self.validation_payload.signed_headers)
# Stringify headers, only include headers in signed_headers
headers_str = ['{}:{}'.format(h, all_headers[h]) for h in signed_headers if h in all_headers]
headers_str = '\n'.join(headers_str)
# Sort query string parameters
query_string = self.validation_payload.query_string.split('&')
query_string = self._sort_and_join(query_string, '&')
req_body_hash = self._hash(self.validation_payload.body) or ''
signed_headers_str = ';'.join(signed_headers)
signed_payload = [
self.validation_payload.method,
self.validation_payload.path,
query_string,
]
if headers_str:
signed_payload.append(headers_str)
signed_payload.append('')
signed_payload.append(signed_headers_str)
signed_payload.append(req_body_hash)
signed_payload = '\n'.join(signed_payload)
return {
'hrh': signed_headers_str,
'rqh': self._hash(signed_payload)
}
@classmethod
def _sort_and_join(cls, values, joiner):
if isinstance(values, string_types):
return values
return joiner.join(sorted(values))
@classmethod
def _hash(cls, input_str):
if not input_str:
return input_str
if not isinstance(input_str, bytes):
input_str = input_str.encode('utf-8')
return sha256(input_str).hexdigest()
|