File: token.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (218 lines) | stat: -rw-r--r-- 8,613 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import time
from typing import Optional
from typing import Union

from authlib.common.security import generate_token
from authlib.jose import jwt
from authlib.oauth2.rfc6750.token import BearerTokenGenerator


class JWTBearerTokenGenerator(BearerTokenGenerator):
    r"""A JWT formatted access token generator.

    :param issuer: The issuer identifier. Will appear in the JWT ``iss`` claim.

    :param \\*\\*kwargs: Other parameters are inherited from
        :class:`~authlib.oauth2.rfc6750.token.BearerTokenGenerator`.

    This token generator can be registered into the authorization server::

        class MyJWTBearerTokenGenerator(JWTBearerTokenGenerator):
            def get_jwks(self): ...

            def get_extra_claims(self, client, grant_type, user, scope): ...


        authorization_server.register_token_generator(
            "default",
            MyJWTBearerTokenGenerator(
                issuer="https://authorization-server.example.org"
            ),
        )
    """

    def __init__(
        self,
        issuer,
        alg="RS256",
        refresh_token_generator=None,
        expires_generator=None,
    ):
        super().__init__(
            self.access_token_generator, refresh_token_generator, expires_generator
        )
        self.issuer = issuer
        self.alg = alg

    def get_jwks(self):
        """Return the JWKs that will be used to sign the JWT access token.
        Developers MUST re-implement this method::

            def get_jwks(self):
                return load_jwks("jwks.json")
        """
        raise NotImplementedError()

    def get_extra_claims(self, client, grant_type, user, scope):
        """Return extra claims to add in the JWT access token. Developers MAY
        re-implement this method to add identity claims like the ones in
        :ref:`specs/oidc` ID Token, or any other arbitrary claims::

            def get_extra_claims(self, client, grant_type, user, scope):
                return generate_user_info(user, scope)
        """
        return {}

    def get_audiences(self, client, user, scope) -> Union[str, list[str]]:
        """Return the audience for the token. By default this simply returns
        the client ID. Developers MAY re-implement this method to add extra
        audiences::

            def get_audiences(self, client, user, scope):
                return [
                    client.get_client_id(),
                    resource_server.get_id(),
                ]
        """
        return client.get_client_id()

    def get_acr(self, user) -> Optional[str]:
        """Authentication Context Class Reference.
        Returns a user-defined case sensitive string indicating the class of
        authentication the used performed. Token audience may refuse to give access to
        some resources if some ACR criteria are not met.
        :ref:`specs/oidc` defines one special value: ``0`` means that the user
        authentication did not respect `ISO29115`_ level 1, and will be refused monetary
        operations. Developers MAY re-implement this method::

            def get_acr(self, user):
                if user.insecure_session():
                    return "0"
                return "urn:mace:incommon:iap:silver"

        .. _ISO29115: https://www.iso.org/standard/45138.html
        """
        return None

    def get_auth_time(self, user) -> Optional[int]:
        """User authentication time.
        Time when the End-User authentication occurred. Its value is a JSON number
        representing the number of seconds from 1970-01-01T0:0:0Z as measured in UTC
        until the date/time. Developers MAY re-implement this method::

            def get_auth_time(self, user):
                return datetime.timestamp(user.get_auth_time())
        """
        return None

    def get_amr(self, user) -> Optional[list[str]]:
        """Authentication Methods References.
        Defined by :ref:`specs/oidc` as an option list of user-defined case-sensitive
        strings indication which authentication methods have been used to authenticate
        the user. Developers MAY re-implement this method::

            def get_amr(self, user):
                return ["2FA"] if user.has_2fa_enabled() else []
        """
        return None

    def get_jti(self, client, grant_type, user, scope) -> str:
        """JWT ID.
        Create an unique identifier for the token. Developers MAY re-implement
        this method::

            def get_jti(self, client, grant_type, user scope):
                return generate_random_string(16)
        """
        return generate_token(16)

    def access_token_generator(self, client, grant_type, user, scope):
        now = int(time.time())
        expires_in = now + self._get_expires_in(client, grant_type)

        token_data = {
            "iss": self.issuer,
            "exp": expires_in,
            "client_id": client.get_client_id(),
            "iat": now,
            "jti": self.get_jti(client, grant_type, user, scope),
            "scope": scope,
        }

        # In cases of access tokens obtained through grants where a resource owner is
        # involved, such as the authorization code grant, the value of 'sub' SHOULD
        # correspond to the subject identifier of the resource owner.

        if user:
            token_data["sub"] = user.get_user_id()

        # In cases of access tokens obtained through grants where no resource owner is
        # involved, such as the client credentials grant, the value of 'sub' SHOULD
        # correspond to an identifier the authorization server uses to indicate the
        # client application.

        else:
            token_data["sub"] = client.get_client_id()

        # If the request includes a 'resource' parameter (as defined in [RFC8707]), the
        # resulting JWT access token 'aud' claim SHOULD have the same value as the
        # 'resource' parameter in the request.

        # TODO: Implement this with RFC8707
        if False:  # pragma: no cover
            ...

        # If the request does not include a 'resource' parameter, the authorization
        # server MUST use a default resource indicator in the 'aud' claim. If a 'scope'
        # parameter is present in the request, the authorization server SHOULD use it to
        # infer the value of the default resource indicator to be used in the 'aud'
        # claim. The mechanism through which scopes are associated with default resource
        # indicator values is outside the scope of this specification.

        else:
            token_data["aud"] = self.get_audiences(client, user, scope)

        # If the values in the 'scope' parameter refer to different default resource
        # indicator values, the authorization server SHOULD reject the request with
        # 'invalid_scope' as described in Section 4.1.2.1 of [RFC6749].
        # TODO: Implement this with RFC8707

        if auth_time := self.get_auth_time(user):
            token_data["auth_time"] = auth_time

        # The meaning and processing of acr Claim Values is out of scope for this
        # specification.

        if acr := self.get_acr(user):
            token_data["acr"] = acr

        # The definition of particular values to be used in the amr Claim is beyond the
        # scope of this specification.

        if amr := self.get_amr(user):
            token_data["amr"] = amr

        # Authorization servers MAY return arbitrary attributes not defined in any
        # existing specification, as long as the corresponding claim names are collision
        # resistant or the access tokens are meant to be used only within a private
        # subsystem. Please refer to Sections 4.2 and 4.3 of [RFC7519] for details.

        token_data.update(self.get_extra_claims(client, grant_type, user, scope))

        # This specification registers the 'application/at+jwt' media type, which can
        # be used to indicate that the content is a JWT access token. JWT access tokens
        # MUST include this media type in the 'typ' header parameter to explicitly
        # declare that the JWT represents an access token complying with this profile.
        # Per the definition of 'typ' in Section 4.1.9 of [RFC7515], it is RECOMMENDED
        # that the 'application/' prefix be omitted. Therefore, the 'typ' value used
        # SHOULD be 'at+jwt'.

        header = {"alg": self.alg, "typ": "at+jwt"}

        access_token = jwt.encode(
            header,
            token_data,
            key=self.get_jwks(),
            check=False,
        )
        return access_token.decode()