File: implicit.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (154 lines) | stat: -rw-r--r-- 5,580 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import logging

from authlib.oauth2.rfc6749 import AccessDeniedError
from authlib.oauth2.rfc6749 import ImplicitGrant
from authlib.oauth2.rfc6749 import InvalidScopeError
from authlib.oauth2.rfc6749 import OAuth2Error
from authlib.oauth2.rfc6749.hooks import hooked

from .util import create_response_mode_response
from .util import generate_id_token
from .util import is_openid_scope
from .util import validate_nonce
from .util import validate_request_prompt

log = logging.getLogger(__name__)


class OpenIDImplicitGrant(ImplicitGrant):
    RESPONSE_TYPES = {"id_token token", "id_token"}
    DEFAULT_RESPONSE_MODE = "fragment"

    def exists_nonce(self, nonce, request):
        """Check if the given nonce is existing in your database. Developers
        should implement this method in subclass, e.g.::

            def exists_nonce(self, nonce, request):
                exists = AuthorizationCode.query.filter_by(
                    client_id=request.payload.client_id, nonce=nonce
                ).first()
                return bool(exists)

        :param nonce: A string of "nonce" parameter in request
        :param request: OAuth2Request instance
        :return: Boolean
        """
        raise NotImplementedError()

    def get_jwt_config(self):
        """Get the JWT configuration for OpenIDImplicitGrant. The JWT
        configuration will be used to generate ``id_token``. Developers
        MUST implement this method in subclass, e.g.::

            def get_jwt_config(self):
                return {
                    "key": read_private_key_file(key_path),
                    "alg": "RS256",
                    "iss": "issuer-identity",
                    "exp": 3600,
                }

        :return: dict
        """
        raise NotImplementedError()

    def generate_user_info(self, user, scope):
        """Provide user information for the given scope. Developers
        MUST implement this method in subclass, e.g.::

            from authlib.oidc.core import UserInfo


            def generate_user_info(self, user, scope):
                user_info = UserInfo(sub=user.id, name=user.name)
                if "email" in scope:
                    user_info["email"] = user.email
                return user_info

        :param user: user instance
        :param scope: scope of the token
        :return: ``authlib.oidc.core.UserInfo`` instance
        """
        raise NotImplementedError()

    def get_audiences(self, request):
        """Parse `aud` value for id_token, default value is client id. Developers
        MAY rewrite this method to provide a customized audience value.
        """
        client = request.client
        return [client.get_client_id()]

    def validate_authorization_request(self):
        if not is_openid_scope(self.request.payload.scope):
            raise InvalidScopeError(
                "Missing 'openid' scope",
                redirect_uri=self.request.payload.redirect_uri,
                redirect_fragment=True,
            )
        redirect_uri = super().validate_authorization_request()
        try:
            validate_nonce(self.request, self.exists_nonce, required=True)
        except OAuth2Error as error:
            error.redirect_uri = redirect_uri
            error.redirect_fragment = True
            raise error
        return redirect_uri

    @hooked
    def validate_consent_request(self):
        redirect_uri = self.validate_authorization_request()
        validate_request_prompt(self, redirect_uri, redirect_fragment=True)
        return redirect_uri

    def create_authorization_response(self, redirect_uri, grant_user):
        state = self.request.payload.state
        if grant_user:
            params = self.create_granted_params(grant_user)
            if state:
                params.append(("state", state))
        else:
            error = AccessDeniedError()
            params = error.get_body()

        # http://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes
        response_mode = self.request.payload.data.get(
            "response_mode", self.DEFAULT_RESPONSE_MODE
        )
        return create_response_mode_response(
            redirect_uri=redirect_uri,
            params=params,
            response_mode=response_mode,
        )

    def create_granted_params(self, grant_user):
        self.request.user = grant_user
        client = self.request.client
        token = self.generate_token(
            user=grant_user,
            scope=self.request.payload.scope,
            include_refresh_token=False,
        )
        if self.request.payload.response_type == "id_token":
            token = {
                "expires_in": token["expires_in"],
                "scope": token["scope"],
            }
            token = self.process_implicit_token(token)
        else:
            log.debug("Grant token %r to %r", token, client)
            self.server.save_token(token, self.request)
            token = self.process_implicit_token(token)
        params = [(k, token[k]) for k in token]
        return params

    def process_implicit_token(self, token, code=None):
        config = self.get_jwt_config()
        config["aud"] = self.get_audiences(self.request)
        config["nonce"] = self.request.payload.data.get("nonce")
        if code is not None:
            config["code"] = code

        user_info = self.generate_user_info(self.request.user, token["scope"])
        id_token = generate_id_token(token, user_info, **config)
        token["id_token"] = id_token
        return token