File: authorization_server.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (168 lines) | stat: -rw-r--r-- 6,146 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import logging

from flask import Response
from flask import request as flask_req
from werkzeug.utils import import_string

from authlib.common.security import generate_token
from authlib.common.urls import url_encode
from authlib.oauth1 import AuthorizationServer as _AuthorizationServer
from authlib.oauth1 import OAuth1Request

log = logging.getLogger(__name__)


class AuthorizationServer(_AuthorizationServer):
    """Flask implementation of :class:`authlib.rfc5849.AuthorizationServer`.
    Initialize it with Flask app instance, client model class and cache::

        server = AuthorizationServer(app=app, query_client=query_client)
        # or initialize lazily
        server = AuthorizationServer()
        server.init_app(app, query_client=query_client)

    :param app: A Flask app instance
    :param query_client: A function to get client by client_id. The client
        model class MUST implement the methods described by
        :class:`~authlib.oauth1.rfc5849.ClientMixin`.
    :param token_generator: A function to generate token
    """

    def __init__(self, app=None, query_client=None, token_generator=None):
        self.app = app
        self.query_client = query_client
        self.token_generator = token_generator

        self._hooks = {
            "exists_nonce": None,
            "create_temporary_credential": None,
            "get_temporary_credential": None,
            "delete_temporary_credential": None,
            "create_authorization_verifier": None,
            "create_token_credential": None,
        }
        if app is not None:
            self.init_app(app)

    def init_app(self, app, query_client=None, token_generator=None):
        if query_client is not None:
            self.query_client = query_client
        if token_generator is not None:
            self.token_generator = token_generator

        if self.token_generator is None:
            self.token_generator = self.create_token_generator(app)

        methods = app.config.get("OAUTH1_SUPPORTED_SIGNATURE_METHODS")
        if methods and isinstance(methods, (list, tuple)):
            self.SUPPORTED_SIGNATURE_METHODS = methods

        self.app = app

    def register_hook(self, name, func):
        if name not in self._hooks:
            raise ValueError('Invalid "name" of hook')
        self._hooks[name] = func

    def create_token_generator(self, app):
        token_generator = app.config.get("OAUTH1_TOKEN_GENERATOR")

        if isinstance(token_generator, str):
            token_generator = import_string(token_generator)
        else:
            length = app.config.get("OAUTH1_TOKEN_LENGTH", 42)

            def token_generator():
                return generate_token(length)

        secret_generator = app.config.get("OAUTH1_TOKEN_SECRET_GENERATOR")
        if isinstance(secret_generator, str):
            secret_generator = import_string(secret_generator)
        else:
            length = app.config.get("OAUTH1_TOKEN_SECRET_LENGTH", 48)

            def secret_generator():
                return generate_token(length)

        def create_token():
            return {
                "oauth_token": token_generator(),
                "oauth_token_secret": secret_generator(),
            }

        return create_token

    def get_client_by_id(self, client_id):
        return self.query_client(client_id)

    def exists_nonce(self, nonce, request):
        func = self._hooks["exists_nonce"]
        if callable(func):
            timestamp = request.timestamp
            client_id = request.client_id
            token = request.token
            return func(nonce, timestamp, client_id, token)

        raise RuntimeError('"exists_nonce" hook is required.')

    def create_temporary_credential(self, request):
        func = self._hooks["create_temporary_credential"]
        if callable(func):
            token = self.token_generator()
            return func(token, request.client_id, request.redirect_uri)
        raise RuntimeError('"create_temporary_credential" hook is required.')

    def get_temporary_credential(self, request):
        func = self._hooks["get_temporary_credential"]
        if callable(func):
            return func(request.token)

        raise RuntimeError('"get_temporary_credential" hook is required.')

    def delete_temporary_credential(self, request):
        func = self._hooks["delete_temporary_credential"]
        if callable(func):
            return func(request.token)

        raise RuntimeError('"delete_temporary_credential" hook is required.')

    def create_authorization_verifier(self, request):
        func = self._hooks["create_authorization_verifier"]
        if callable(func):
            verifier = generate_token(36)
            func(request.credential, request.user, verifier)
            return verifier

        raise RuntimeError('"create_authorization_verifier" hook is required.')

    def create_token_credential(self, request):
        func = self._hooks["create_token_credential"]
        if callable(func):
            temporary_credential = request.credential
            token = self.token_generator()
            return func(token, temporary_credential)

        raise RuntimeError('"create_token_credential" hook is required.')

    def check_authorization_request(self):
        req = self.create_oauth1_request(None)
        self.validate_authorization_request(req)
        return req

    def create_authorization_response(self, request=None, grant_user=None):
        return super().create_authorization_response(request, grant_user)

    def create_token_response(self, request=None):
        return super().create_token_response(request)

    def create_oauth1_request(self, request):
        if request is None:
            request = flask_req
        if request.method in ("POST", "PUT"):
            body = request.form.to_dict(flat=True)
        else:
            body = None
        return OAuth1Request(request.method, request.url, body, request.headers)

    def handle_response(self, status_code, payload, headers):
        return Response(url_encode(payload), status=status_code, headers=headers)