1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
from collections import namedtuple
from requests import Request, Session
from twilio.base.exceptions import TwilioRestException
from urllib.parse import urlparse
from twilio.http import HttpClient
from twilio.http.response import Response
from twilio.jwt.validation import ClientValidationJwt
ValidationPayload = namedtuple(
"ValidationPayload",
["method", "path", "query_string", "all_headers", "signed_headers", "body"],
)
class ValidationClient(HttpClient):
__SIGNED_HEADERS = ["authorization", "host"]
def __init__(
self,
account_sid,
api_key_sid,
credential_sid,
private_key,
pool_connections=True,
):
"""
Build a ValidationClient which signs requests with private_key and allows Twilio to
validate request has not been tampered with.
:param str account_sid: A Twilio Account Sid starting with 'AC'
:param str api_key_sid: A Twilio API Key Sid starting with 'SK'
:param str credential_sid: A Credential Sid starting with 'CR',
corresponds to public key Twilio will use to verify the JWT.
:param str private_key: The private key used to sign the Client Validation JWT.
"""
self.account_sid = account_sid
self.credential_sid = credential_sid
self.api_key_sid = api_key_sid
self.private_key = private_key
self.session = Session() if pool_connections else None
def request(
self,
method,
url,
params=None,
data=None,
headers=None,
auth=None,
timeout=None,
allow_redirects=False,
):
"""
Make a signed HTTP Request
:param str method: The HTTP method to use
:param str url: The URL to request
:param dict params: Query parameters to append to the URL
:param dict data: Parameters to go in the body of the HTTP request
:param dict headers: HTTP Headers to send with the request
:param tuple auth: Basic Auth arguments
:param float timeout: Socket/Read timeout for the request
:param boolean allow_redirects: Whether or not to allow redirects
See the requests documentation for explanation of all these parameters
:return: An http response
:rtype: A :class:`Response <twilio.rest.http.response.Response>` object
"""
session = self.session or Session()
request = Request(
method.upper(), url, params=params, data=data, headers=headers, auth=auth
)
prepared_request = session.prepare_request(request)
if (
"Host" not in prepared_request.headers
and "host" not in prepared_request.headers
):
prepared_request.headers["Host"] = self._get_host(prepared_request)
validation_payload = self._build_validation_payload(prepared_request)
jwt = ClientValidationJwt(
self.account_sid,
self.api_key_sid,
self.credential_sid,
self.private_key,
validation_payload,
)
prepared_request.headers["Twilio-Client-Validation"] = jwt.to_jwt()
response = session.send(
prepared_request,
allow_redirects=allow_redirects,
timeout=timeout,
)
return Response(int(response.status_code), response.text)
def _build_validation_payload(self, request):
"""
Extract relevant information from request to build a ClientValidationJWT
:param PreparedRequest request: request we will extract information from.
:return: ValidationPayload
"""
parsed = urlparse(request.url)
path = parsed.path
query_string = parsed.query or ""
return ValidationPayload(
method=request.method,
path=path,
query_string=query_string,
all_headers=request.headers,
signed_headers=ValidationClient.__SIGNED_HEADERS,
body=request.body or "",
)
def _get_host(self, request):
"""Pull the Host out of the request"""
parsed = urlparse(request.url)
return str(parsed.netloc)
def validate_ssl_certificate(self, client):
"""
Validate that a request to the new SSL certificate is successful
:return: null on success, raise TwilioRestException if the request fails
"""
response = client.request("GET", "https://tls-test.twilio.com:443")
if response.status_code < 200 or response.status_code >= 300:
raise TwilioRestException(
response.status_code,
"https://tls-test.twilio.com:443",
"Failed to validate SSL certificate",
)
|