File: authorization_server.py

package info (click to toggle)
python-authlib 1.6.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,024 kB
  • sloc: python: 27,412; makefile: 53; sh: 14
file content (122 lines) | stat: -rw-r--r-- 4,463 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from django.conf import settings
from django.http import HttpResponse
from django.utils.module_loading import import_string

from authlib.common.encoding import json_dumps
from authlib.common.security import generate_token as _generate_token
from authlib.oauth2 import AuthorizationServer as _AuthorizationServer
from authlib.oauth2.rfc6750 import BearerTokenGenerator

from .requests import DjangoJsonRequest
from .requests import DjangoOAuth2Request
from .signals import client_authenticated
from .signals import token_revoked


class AuthorizationServer(_AuthorizationServer):
    """Django implementation of :class:`authlib.oauth2.rfc6749.AuthorizationServer`.
    Initialize it with client model and token model::

        from authlib.integrations.django_oauth2 import AuthorizationServer
        from your_project.models import OAuth2Client, OAuth2Token

        server = AuthorizationServer(OAuth2Client, OAuth2Token)
    """

    def __init__(self, client_model, token_model):
        super().__init__()
        self.client_model = client_model
        self.token_model = token_model
        self.load_config(getattr(settings, "AUTHLIB_OAUTH2_PROVIDER", {}))

    def load_config(self, config):
        self.config = config
        scopes_supported = self.config.get("scopes_supported")
        self.scopes_supported = scopes_supported
        # add default token generator
        self.register_token_generator("default", self.create_bearer_token_generator())

    def query_client(self, client_id):
        """Default method for ``AuthorizationServer.query_client``. Developers MAY
        rewrite this function to meet their own needs.
        """
        try:
            return self.client_model.objects.get(client_id=client_id)
        except self.client_model.DoesNotExist:
            return None

    def save_token(self, token, request):
        """Default method for ``AuthorizationServer.save_token``. Developers MAY
        rewrite this function to meet their own needs.
        """
        client = request.client
        if request.user:
            user_id = request.user.pk
        else:
            user_id = client.user_id
        item = self.token_model(client_id=client.client_id, user_id=user_id, **token)
        item.save()
        return item

    def create_oauth2_request(self, request):
        return DjangoOAuth2Request(request)

    def create_json_request(self, request):
        return DjangoJsonRequest(request)

    def handle_response(self, status_code, payload, headers):
        if isinstance(payload, dict):
            payload = json_dumps(payload)
        resp = HttpResponse(payload, status=status_code)
        for k, v in headers:
            resp[k] = v
        return resp

    def send_signal(self, name, *args, **kwargs):
        if name == "after_authenticate_client":
            client_authenticated.send(*args, sender=self.__class__, **kwargs)
        elif name == "after_revoke_token":
            token_revoked.send(*args, sender=self.__class__, **kwargs)

    def create_bearer_token_generator(self):
        """Default method to create BearerToken generator."""
        conf = self.config.get("access_token_generator", True)
        access_token_generator = create_token_generator(conf, 42)

        conf = self.config.get("refresh_token_generator", False)
        refresh_token_generator = create_token_generator(conf, 48)

        conf = self.config.get("token_expires_in")
        expires_generator = create_token_expires_in_generator(conf)

        return BearerTokenGenerator(
            access_token_generator=access_token_generator,
            refresh_token_generator=refresh_token_generator,
            expires_generator=expires_generator,
        )


def create_token_generator(token_generator_conf, length=42):
    if callable(token_generator_conf):
        return token_generator_conf

    if isinstance(token_generator_conf, str):
        return import_string(token_generator_conf)
    elif token_generator_conf is True:

        def token_generator(*args, **kwargs):
            return _generate_token(length)

        return token_generator


def create_token_expires_in_generator(expires_in_conf=None):
    data = {}
    data.update(BearerTokenGenerator.GRANT_TYPES_EXPIRES_IN)
    if expires_in_conf:
        data.update(expires_in_conf)

    def expires_in(client, grant_type):
        return data.get(grant_type, BearerTokenGenerator.DEFAULT_EXPIRES_IN)

    return expires_in