File: client_auth.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (191 lines) | stat: -rw-r--r-- 7,079 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import base64
import hashlib
import time

from authlib.common.encoding import to_native
from authlib.common.security import generate_token
from authlib.common.urls import extract_params

from .parameters import prepare_form_encoded_body
from .parameters import prepare_headers
from .parameters import prepare_request_uri_query
from .signature import SIGNATURE_HMAC_SHA1
from .signature import SIGNATURE_PLAINTEXT
from .signature import SIGNATURE_RSA_SHA1
from .signature import SIGNATURE_TYPE_BODY
from .signature import SIGNATURE_TYPE_HEADER
from .signature import SIGNATURE_TYPE_QUERY
from .signature import sign_hmac_sha1
from .signature import sign_plaintext
from .signature import sign_rsa_sha1
from .wrapper import OAuth1Request

CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
CONTENT_TYPE_MULTI_PART = "multipart/form-data"


class ClientAuth:
    SIGNATURE_METHODS = {
        SIGNATURE_HMAC_SHA1: sign_hmac_sha1,
        SIGNATURE_RSA_SHA1: sign_rsa_sha1,
        SIGNATURE_PLAINTEXT: sign_plaintext,
    }

    @classmethod
    def register_signature_method(cls, name, sign):
        """Extend client signature methods.

        :param name: A string to represent signature method.
        :param sign: A function to generate signature.

        The ``sign`` method accept 2 parameters::

            def custom_sign_method(client, request):
                # client is the instance of Client.
                return "your-signed-string"


            Client.register_signature_method("custom-name", custom_sign_method)
        """
        cls.SIGNATURE_METHODS[name] = sign

    def __init__(
        self,
        client_id,
        client_secret=None,
        token=None,
        token_secret=None,
        redirect_uri=None,
        rsa_key=None,
        verifier=None,
        signature_method=SIGNATURE_HMAC_SHA1,
        signature_type=SIGNATURE_TYPE_HEADER,
        realm=None,
        force_include_body=False,
    ):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = token
        self.token_secret = token_secret
        self.redirect_uri = redirect_uri
        self.signature_method = signature_method
        self.signature_type = signature_type
        self.rsa_key = rsa_key
        self.verifier = verifier
        self.realm = realm
        self.force_include_body = force_include_body

    def get_oauth_signature(self, method, uri, headers, body):
        """Get an OAuth signature to be used in signing a request.

        To satisfy `section 3.4.1.2`_ item 2, if the request argument's
        headers dict attribute contains a Host item, its value will
        replace any netloc part of the request argument's uri attribute
        value.

        .. _`section 3.4.1.2`: https://tools.ietf.org/html/rfc5849#section-3.4.1.2
        """
        sign = self.SIGNATURE_METHODS.get(self.signature_method)
        if not sign:
            raise ValueError("Invalid signature method.")

        request = OAuth1Request(method, uri, body=body, headers=headers)
        return sign(self, request)

    def get_oauth_params(self, nonce, timestamp):
        oauth_params = [
            ("oauth_nonce", nonce),
            ("oauth_timestamp", timestamp),
            ("oauth_version", "1.0"),
            ("oauth_signature_method", self.signature_method),
            ("oauth_consumer_key", self.client_id),
        ]
        if self.token:
            oauth_params.append(("oauth_token", self.token))
        if self.redirect_uri:
            oauth_params.append(("oauth_callback", self.redirect_uri))
        if self.verifier:
            oauth_params.append(("oauth_verifier", self.verifier))
        return oauth_params

    def _render(self, uri, headers, body, oauth_params):
        if self.signature_type == SIGNATURE_TYPE_HEADER:
            headers = prepare_headers(oauth_params, headers, realm=self.realm)
        elif self.signature_type == SIGNATURE_TYPE_BODY:
            if CONTENT_TYPE_FORM_URLENCODED in headers.get("Content-Type", ""):
                decoded_body = extract_params(body) or []
                body = prepare_form_encoded_body(oauth_params, decoded_body)
                headers["Content-Type"] = CONTENT_TYPE_FORM_URLENCODED
        elif self.signature_type == SIGNATURE_TYPE_QUERY:
            uri = prepare_request_uri_query(oauth_params, uri)
        else:
            raise ValueError("Unknown signature type specified.")
        return uri, headers, body

    def sign(self, method, uri, headers, body):
        """Sign the HTTP request, add OAuth parameters and signature.

        :param method: HTTP method of the request.
        :param uri:  URI of the HTTP request.
        :param body: Body payload of the HTTP request.
        :param headers: Headers of the HTTP request.
        :return: uri, headers, body
        """
        nonce = generate_nonce()
        timestamp = generate_timestamp()
        if body is None:
            body = b""

        # transform int to str
        timestamp = str(timestamp)

        if headers is None:
            headers = {}

        oauth_params = self.get_oauth_params(nonce, timestamp)

        # https://datatracker.ietf.org/doc/html/draft-eaton-oauth-bodyhash-00.html
        # include oauth_body_hash
        if body and headers.get("Content-Type") != CONTENT_TYPE_FORM_URLENCODED:
            oauth_body_hash = base64.b64encode(hashlib.sha1(body).digest())
            oauth_params.append(("oauth_body_hash", oauth_body_hash.decode("utf-8")))

        uri, headers, body = self._render(uri, headers, body, oauth_params)

        sig = self.get_oauth_signature(method, uri, headers, body)
        oauth_params.append(("oauth_signature", sig))

        uri, headers, body = self._render(uri, headers, body, oauth_params)
        return uri, headers, body

    def prepare(self, method, uri, headers, body):
        """Add OAuth parameters to the request.

        Parameters may be included from the body if the content-type is
        urlencoded, if no content type is set, a guess is made.
        """
        content_type = to_native(headers.get("Content-Type", ""))
        if self.signature_type == SIGNATURE_TYPE_BODY:
            content_type = CONTENT_TYPE_FORM_URLENCODED
        elif not content_type and extract_params(body):
            content_type = CONTENT_TYPE_FORM_URLENCODED

        if CONTENT_TYPE_FORM_URLENCODED in content_type:
            headers["Content-Type"] = CONTENT_TYPE_FORM_URLENCODED
            uri, headers, body = self.sign(method, uri, headers, body)
        elif self.force_include_body:
            # To allow custom clients to work on non form encoded bodies.
            uri, headers, body = self.sign(method, uri, headers, body)
        else:
            # Omit body data in the signing of non form-encoded requests
            uri, headers, _ = self.sign(method, uri, headers, b"")
            body = b""
        return uri, headers, body


def generate_nonce():
    return generate_token()


def generate_timestamp():
    return str(int(time.time()))