File: validation_client.py

package info (click to toggle)
python-twilio 6.51.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 12,260 kB
  • sloc: python: 128,982; makefile: 51
file content (106 lines) | stat: -rw-r--r-- 4,571 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from collections import namedtuple

from requests import Request, Session

from twilio.base.exceptions import TwilioRestException
from twilio.compat import urlparse
from twilio.http import HttpClient
from twilio.http.response import Response
from twilio.jwt.validation import ClientValidationJwt


ValidationPayload = namedtuple('ValidationPayload', ['method', 'path', 'query_string', 'all_headers',
                                                     'signed_headers', 'body'])


class ValidationClient(HttpClient):
    __SIGNED_HEADERS = ['authorization', 'host']

    def __init__(self, account_sid, api_key_sid, credential_sid, private_key, pool_connections=True):
        """
        Build a ValidationClient which signs requests with private_key and allows Twilio to
        validate request has not been tampered with.

        :param str account_sid: A Twilio Account Sid starting with 'AC'
        :param str api_key_sid: A Twilio API Key Sid starting with 'SK'
        :param str credential_sid: A Credential Sid starting with 'CR',
                                   corresponds to public key Twilio will use to verify the JWT.
        :param str private_key: The private key used to sign the Client Validation JWT.
        """
        self.account_sid = account_sid
        self.credential_sid = credential_sid
        self.api_key_sid = api_key_sid
        self.private_key = private_key
        self.session = Session() if pool_connections else None

    def request(self, method, url, params=None, data=None, headers=None, auth=None, timeout=None,
                allow_redirects=False):
        """
        Make a signed HTTP Request

        :param str method: The HTTP method to use
        :param str url: The URL to request
        :param dict params: Query parameters to append to the URL
        :param dict data: Parameters to go in the body of the HTTP request
        :param dict headers: HTTP Headers to send with the request
        :param tuple auth: Basic Auth arguments
        :param float timeout: Socket/Read timeout for the request
        :param boolean allow_redirects: Whether or not to allow redirects
        See the requests documentation for explanation of all these parameters

        :return: An http response
        :rtype: A :class:`Response <twilio.rest.http.response.Response>` object
        """
        session = self.session or Session()
        request = Request(method.upper(), url, params=params, data=data, headers=headers, auth=auth)
        prepared_request = session.prepare_request(request)

        if 'Host' not in prepared_request.headers and 'host' not in prepared_request.headers:
            prepared_request.headers['Host'] = self._get_host(prepared_request)

        validation_payload = self._build_validation_payload(prepared_request)
        jwt = ClientValidationJwt(self.account_sid, self.api_key_sid, self.credential_sid,
                                  self.private_key, validation_payload)
        prepared_request.headers['Twilio-Client-Validation'] = jwt.to_jwt()

        response = session.send(
            prepared_request,
            allow_redirects=allow_redirects,
            timeout=timeout,
        )

        return Response(int(response.status_code), response.text)

    def _build_validation_payload(self, request):
        """
        Extract relevant information from request to build a ClientValidationJWT
        :param PreparedRequest request: request we will extract information from.
        :return: ValidationPayload
        """
        parsed = urlparse(request.url)
        path = parsed.path
        query_string = parsed.query or ''

        return ValidationPayload(
            method=request.method,
            path=path,
            query_string=query_string,
            all_headers=request.headers,
            signed_headers=ValidationClient.__SIGNED_HEADERS,
            body=request.body or ''
        )

    def _get_host(self, request):
        """Pull the Host out of the request"""
        parsed = urlparse(request.url)
        return parsed.netloc

    def validate_ssl_certificate(self, client):
        """
        Validate that a request to the new SSL certificate is successful
        :return: null on success, raise TwilioRestException if the request fails
        """
        response = client.request('GET', 'https://api.twilio.com:8443')

        if response.status_code < 200 or response.status_code >= 300:
            raise TwilioRestException(response.status_code, 'https://api.twilio.com:8443', 'Failed to validate SSL certificate')