1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
import time
from typing import Optional
from typing import Union
from authlib.common.security import generate_token
from authlib.jose import jwt
from authlib.oauth2.rfc6750.token import BearerTokenGenerator
class JWTBearerTokenGenerator(BearerTokenGenerator):
r"""A JWT formatted access token generator.
:param issuer: The issuer identifier. Will appear in the JWT ``iss`` claim.
:param \\*\\*kwargs: Other parameters are inherited from
:class:`~authlib.oauth2.rfc6750.token.BearerTokenGenerator`.
This token generator can be registered into the authorization server::
class MyJWTBearerTokenGenerator(JWTBearerTokenGenerator):
def get_jwks(self): ...
def get_extra_claims(self, client, grant_type, user, scope): ...
authorization_server.register_token_generator(
"default",
MyJWTBearerTokenGenerator(
issuer="https://authorization-server.example.org"
),
)
"""
def __init__(
self,
issuer,
alg="RS256",
refresh_token_generator=None,
expires_generator=None,
):
super().__init__(
self.access_token_generator, refresh_token_generator, expires_generator
)
self.issuer = issuer
self.alg = alg
def get_jwks(self):
"""Return the JWKs that will be used to sign the JWT access token.
Developers MUST re-implement this method::
def get_jwks(self):
return load_jwks("jwks.json")
"""
raise NotImplementedError()
def get_extra_claims(self, client, grant_type, user, scope):
"""Return extra claims to add in the JWT access token. Developers MAY
re-implement this method to add identity claims like the ones in
:ref:`specs/oidc` ID Token, or any other arbitrary claims::
def get_extra_claims(self, client, grant_type, user, scope):
return generate_user_info(user, scope)
"""
return {}
def get_audiences(self, client, user, scope) -> Union[str, list[str]]:
"""Return the audience for the token. By default this simply returns
the client ID. Developers MAY re-implement this method to add extra
audiences::
def get_audiences(self, client, user, scope):
return [
client.get_client_id(),
resource_server.get_id(),
]
"""
return client.get_client_id()
def get_acr(self, user) -> Optional[str]:
"""Authentication Context Class Reference.
Returns a user-defined case sensitive string indicating the class of
authentication the used performed. Token audience may refuse to give access to
some resources if some ACR criteria are not met.
:ref:`specs/oidc` defines one special value: ``0`` means that the user
authentication did not respect `ISO29115`_ level 1, and will be refused monetary
operations. Developers MAY re-implement this method::
def get_acr(self, user):
if user.insecure_session():
return "0"
return "urn:mace:incommon:iap:silver"
.. _ISO29115: https://www.iso.org/standard/45138.html
"""
return None
def get_auth_time(self, user) -> Optional[int]:
"""User authentication time.
Time when the End-User authentication occurred. Its value is a JSON number
representing the number of seconds from 1970-01-01T0:0:0Z as measured in UTC
until the date/time. Developers MAY re-implement this method::
def get_auth_time(self, user):
return datetime.timestamp(user.get_auth_time())
"""
return None
def get_amr(self, user) -> Optional[list[str]]:
"""Authentication Methods References.
Defined by :ref:`specs/oidc` as an option list of user-defined case-sensitive
strings indication which authentication methods have been used to authenticate
the user. Developers MAY re-implement this method::
def get_amr(self, user):
return ["2FA"] if user.has_2fa_enabled() else []
"""
return None
def get_jti(self, client, grant_type, user, scope) -> str:
"""JWT ID.
Create an unique identifier for the token. Developers MAY re-implement
this method::
def get_jti(self, client, grant_type, user scope):
return generate_random_string(16)
"""
return generate_token(16)
def access_token_generator(self, client, grant_type, user, scope):
now = int(time.time())
expires_in = now + self._get_expires_in(client, grant_type)
token_data = {
"iss": self.issuer,
"exp": expires_in,
"client_id": client.get_client_id(),
"iat": now,
"jti": self.get_jti(client, grant_type, user, scope),
"scope": scope,
}
# In cases of access tokens obtained through grants where a resource owner is
# involved, such as the authorization code grant, the value of 'sub' SHOULD
# correspond to the subject identifier of the resource owner.
if user:
token_data["sub"] = user.get_user_id()
# In cases of access tokens obtained through grants where no resource owner is
# involved, such as the client credentials grant, the value of 'sub' SHOULD
# correspond to an identifier the authorization server uses to indicate the
# client application.
else:
token_data["sub"] = client.get_client_id()
# If the request includes a 'resource' parameter (as defined in [RFC8707]), the
# resulting JWT access token 'aud' claim SHOULD have the same value as the
# 'resource' parameter in the request.
# TODO: Implement this with RFC8707
if False: # pragma: no cover
...
# If the request does not include a 'resource' parameter, the authorization
# server MUST use a default resource indicator in the 'aud' claim. If a 'scope'
# parameter is present in the request, the authorization server SHOULD use it to
# infer the value of the default resource indicator to be used in the 'aud'
# claim. The mechanism through which scopes are associated with default resource
# indicator values is outside the scope of this specification.
else:
token_data["aud"] = self.get_audiences(client, user, scope)
# If the values in the 'scope' parameter refer to different default resource
# indicator values, the authorization server SHOULD reject the request with
# 'invalid_scope' as described in Section 4.1.2.1 of [RFC6749].
# TODO: Implement this with RFC8707
if auth_time := self.get_auth_time(user):
token_data["auth_time"] = auth_time
# The meaning and processing of acr Claim Values is out of scope for this
# specification.
if acr := self.get_acr(user):
token_data["acr"] = acr
# The definition of particular values to be used in the amr Claim is beyond the
# scope of this specification.
if amr := self.get_amr(user):
token_data["amr"] = amr
# Authorization servers MAY return arbitrary attributes not defined in any
# existing specification, as long as the corresponding claim names are collision
# resistant or the access tokens are meant to be used only within a private
# subsystem. Please refer to Sections 4.2 and 4.3 of [RFC7519] for details.
token_data.update(self.get_extra_claims(client, grant_type, user, scope))
# This specification registers the 'application/at+jwt' media type, which can
# be used to indicate that the content is a JWT access token. JWT access tokens
# MUST include this media type in the 'typ' header parameter to explicitly
# declare that the JWT represents an access token complying with this profile.
# Per the definition of 'typ' in Section 4.1.9 of [RFC7515], it is RECOMMENDED
# that the 'application/' prefix be omitted. Therefore, the 'typ' value used
# SHOULD be 'at+jwt'.
header = {"alg": self.alg, "typ": "at+jwt"}
access_token = jwt.encode(
header,
token_data,
key=self.get_jwks(),
check=False,
)
return access_token.decode()
|