1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
import base64
import hmac
from hashlib import sha1, sha256
from urllib.parse import urlparse, parse_qs
def compare(string1, string2):
"""Compare two strings while protecting against timing attacks
:param str string1: the first string
:param str string2: the second string
:returns: True if the strings are equal, False if not
:rtype: :obj:`bool`
"""
if len(string1) != len(string2):
return False
result = True
for c1, c2 in zip(string1, string2):
result &= c1 == c2
return result
def remove_port(uri):
"""Remove the port number from a URI
:param uri: parsed URI that Twilio requested on your server
:returns: full URI without a port number
:rtype: str
"""
if not uri.port:
return uri.geturl()
new_netloc = uri.netloc.split(":")[0]
new_uri = uri._replace(netloc=new_netloc)
return new_uri.geturl()
def add_port(uri):
"""Add the port number to a URI
:param uri: parsed URI that Twilio requested on your server
:returns: full URI with a port number
:rtype: str
"""
if uri.port:
return uri.geturl()
port = 443 if uri.scheme == "https" else 80
new_netloc = uri.netloc + ":" + str(port)
new_uri = uri._replace(netloc=new_netloc)
return new_uri.geturl()
class RequestValidator(object):
def __init__(self, token):
self.token = token.encode("utf-8")
def compute_signature(self, uri, params):
"""Compute the signature for a given request
:param uri: full URI that Twilio requested on your server
:param params: post vars that Twilio sent with the request
:returns: The computed signature
"""
s = uri
if params:
for param_name in sorted(set(params)):
values = self.get_values(params, param_name)
for value in sorted(set(values)):
s += param_name + value
# compute signature and compare signatures
mac = hmac.new(self.token, s.encode("utf-8"), sha1)
computed = base64.b64encode(mac.digest())
computed = computed.decode("utf-8")
return computed.strip()
def get_values(self, param_dict, param_name):
try:
# Support MultiDict used by Flask.
return param_dict.getall(param_name)
except AttributeError:
try:
# Support QueryDict used by Django.
return param_dict.getlist(param_name)
except AttributeError:
# Fallback to a standard dict.
return [param_dict[param_name]]
def compute_hash(self, body):
computed = sha256(body.encode("utf-8")).hexdigest()
return computed.strip()
def validate(self, uri, params, signature):
"""Validate a request from Twilio
:param uri: full URI that Twilio requested on your server
:param params: dictionary of POST variables or string of POST body for JSON requests
:param signature: expected signature in HTTP X-Twilio-Signature header
:returns: True if the request passes validation, False if not
"""
if params is None:
params = {}
parsed_uri = urlparse(uri)
uri_with_port = add_port(parsed_uri)
uri_without_port = remove_port(parsed_uri)
valid_body_hash = True # May not receive body hash, so default succeed
query = parse_qs(parsed_uri.query)
if "bodySHA256" in query and isinstance(params, str):
valid_body_hash = compare(self.compute_hash(params), query["bodySHA256"][0])
params = {}
# check signature of uri with and without port,
# since sig generation on back end is inconsistent
valid_signature = compare(
self.compute_signature(uri_without_port, params), signature
)
valid_signature_with_port = compare(
self.compute_signature(uri_with_port, params), signature
)
return valid_body_hash and (valid_signature or valid_signature_with_port)
|