1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
from authlib.common.errors import ContinueIteration
from authlib.consts import default_json_headers
from authlib.jose.errors import ExpiredTokenError
from authlib.jose.errors import InvalidClaimError
from authlib.oauth2.rfc6750.errors import InvalidTokenError
from authlib.oauth2.rfc9068.token_validator import JWTBearerTokenValidator
from ..rfc7662 import IntrospectionEndpoint
class JWTIntrospectionEndpoint(IntrospectionEndpoint):
r"""JWTIntrospectionEndpoint inherits from :ref:`specs/rfc7662`
:class:`~authlib.oauth2.rfc7662.IntrospectionEndpoint` and implements the machinery
to automatically process the JWT access tokens.
:param issuer: The issuer identifier for which tokens will be introspected.
:param \\*\\*kwargs: Other parameters are inherited from
:class:`~authlib.oauth2.rfc7662.introspection.IntrospectionEndpoint`.
::
class MyJWTAccessTokenIntrospectionEndpoint(JWTIntrospectionEndpoint):
def get_jwks(self): ...
def get_username(self, user_id): ...
# endpoint dedicated to JWT access token introspection
authorization_server.register_endpoint(
MyJWTAccessTokenIntrospectionEndpoint(
issuer="https://authorization-server.example.org",
)
)
# another endpoint dedicated to refresh token introspection
authorization_server.register_endpoint(MyRefreshTokenIntrospectionEndpoint)
"""
#: Endpoint name to be registered
ENDPOINT_NAME = "introspection"
def __init__(self, issuer, server=None, *args, **kwargs):
super().__init__(*args, server=server, **kwargs)
self.issuer = issuer
def create_endpoint_response(self, request):
""""""
# The authorization server first validates the client credentials
client = self.authenticate_endpoint_client(request)
# then verifies whether the token was issued to the client making
# the revocation request
token = self.authenticate_token(request, client)
# the authorization server invalidates the token
body = self.create_introspection_payload(token)
return 200, body, default_json_headers
def authenticate_token(self, request, client):
""""""
self.check_params(request, client)
# do not attempt to decode refresh_tokens
if request.form.get("token_type_hint") not in ("access_token", None):
raise ContinueIteration()
validator = JWTBearerTokenValidator(issuer=self.issuer, resource_server=None)
validator.get_jwks = self.get_jwks
try:
token = validator.authenticate_token(request.form["token"])
# if the token is not a JWT, fall back to the regular flow
except InvalidTokenError as exc:
raise ContinueIteration() from exc
if token and self.check_permission(token, client, request):
return token
def create_introspection_payload(self, token):
if not token:
return {"active": False}
try:
token.validate()
except ExpiredTokenError:
return {"active": False}
except InvalidClaimError as exc:
if exc.claim_name == "iss":
raise ContinueIteration() from exc
raise InvalidTokenError() from exc
payload = {
"active": True,
"token_type": "Bearer",
"client_id": token["client_id"],
"scope": token["scope"],
"sub": token["sub"],
"aud": token["aud"],
"iss": token["iss"],
"exp": token["exp"],
"iat": token["iat"],
}
if username := self.get_username(token["sub"]):
payload["username"] = username
return payload
def get_jwks(self):
"""Return the JWKs that will be used to check the JWT access token signature.
Developers MUST re-implement this method::
def get_jwks(self):
return load_jwks("jwks.json")
"""
raise NotImplementedError()
def get_username(self, user_id: str) -> str:
"""Returns an username from a user ID.
Developers MAY re-implement this method::
def get_username(self, user_id):
return User.get(id=user_id).username
"""
return None
|