1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
from authlib.common.errors import ContinueIteration
from authlib.oauth2.rfc6750.errors import InvalidTokenError
from authlib.oauth2.rfc9068.token_validator import JWTBearerTokenValidator
from ..rfc6749 import UnsupportedTokenTypeError
from ..rfc7009 import RevocationEndpoint
class JWTRevocationEndpoint(RevocationEndpoint):
r"""JWTRevocationEndpoint inherits from `RFC7009`_
:class:`~authlib.oauth2.rfc7009.RevocationEndpoint`.
The JWT access tokens cannot be revoked.
If the submitted token is a JWT access token, then revocation returns
a `invalid_token_error`.
:param issuer: The issuer identifier.
:param \\*\\*kwargs: Other parameters are inherited from
:class:`~authlib.oauth2.rfc7009.RevocationEndpoint`.
Plain text access tokens and other kind of tokens such as refresh_tokens
will be ignored by this endpoint and passed to the next revocation endpoint::
class MyJWTAccessTokenRevocationEndpoint(JWTRevocationEndpoint):
def get_jwks(self): ...
# endpoint dedicated to JWT access token revokation
authorization_server.register_endpoint(
MyJWTAccessTokenRevocationEndpoint(
issuer="https://authorization-server.example.org",
)
)
# another endpoint dedicated to refresh token revokation
authorization_server.register_endpoint(MyRefreshTokenRevocationEndpoint)
.. _RFC7009: https://tools.ietf.org/html/rfc7009
"""
def __init__(self, issuer, server=None, *args, **kwargs):
super().__init__(*args, server=server, **kwargs)
self.issuer = issuer
def authenticate_token(self, request, client):
""""""
self.check_params(request, client)
# do not attempt to revoke refresh_tokens
if request.form.get("token_type_hint") not in ("access_token", None):
raise ContinueIteration()
validator = JWTBearerTokenValidator(issuer=self.issuer, resource_server=None)
validator.get_jwks = self.get_jwks
try:
validator.authenticate_token(request.form["token"])
# if the token is not a JWT, fall back to the regular flow
except InvalidTokenError as exc:
raise ContinueIteration() from exc
# JWT access token cannot be revoked
raise UnsupportedTokenTypeError()
def get_jwks(self):
"""Return the JWKs that will be used to check the JWT access token signature.
Developers MUST re-implement this method::
def get_jwks(self):
return load_jwks("jwks.json")
"""
raise NotImplementedError()
|