File: introspection.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (127 lines) | stat: -rw-r--r-- 4,385 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from authlib.common.errors import ContinueIteration
from authlib.consts import default_json_headers
from authlib.jose.errors import ExpiredTokenError
from authlib.jose.errors import InvalidClaimError
from authlib.oauth2.rfc6750.errors import InvalidTokenError
from authlib.oauth2.rfc9068.token_validator import JWTBearerTokenValidator

from ..rfc7662 import IntrospectionEndpoint


class JWTIntrospectionEndpoint(IntrospectionEndpoint):
    r"""JWTIntrospectionEndpoint inherits from :ref:`specs/rfc7662`
    :class:`~authlib.oauth2.rfc7662.IntrospectionEndpoint` and implements the machinery
    to automatically process the JWT access tokens.

    :param issuer: The issuer identifier for which tokens will be introspected.

    :param \\*\\*kwargs: Other parameters are inherited from
        :class:`~authlib.oauth2.rfc7662.introspection.IntrospectionEndpoint`.

    ::

        class MyJWTAccessTokenIntrospectionEndpoint(JWTIntrospectionEndpoint):
            def get_jwks(self): ...

            def get_username(self, user_id): ...


        # endpoint dedicated to JWT access token introspection
        authorization_server.register_endpoint(
            MyJWTAccessTokenIntrospectionEndpoint(
                issuer="https://authorization-server.example.org",
            )
        )

        # another endpoint dedicated to refresh token introspection
        authorization_server.register_endpoint(MyRefreshTokenIntrospectionEndpoint)

    """

    #: Endpoint name to be registered
    ENDPOINT_NAME = "introspection"

    def __init__(self, issuer, server=None, *args, **kwargs):
        super().__init__(*args, server=server, **kwargs)
        self.issuer = issuer

    def create_endpoint_response(self, request):
        """"""
        # The authorization server first validates the client credentials
        client = self.authenticate_endpoint_client(request)

        # then verifies whether the token was issued to the client making
        # the revocation request
        token = self.authenticate_token(request, client)

        # the authorization server invalidates the token
        body = self.create_introspection_payload(token)
        return 200, body, default_json_headers

    def authenticate_token(self, request, client):
        """"""
        self.check_params(request, client)

        # do not attempt to decode refresh_tokens
        if request.form.get("token_type_hint") not in ("access_token", None):
            raise ContinueIteration()

        validator = JWTBearerTokenValidator(issuer=self.issuer, resource_server=None)
        validator.get_jwks = self.get_jwks
        try:
            token = validator.authenticate_token(request.form["token"])

        # if the token is not a JWT, fall back to the regular flow
        except InvalidTokenError as exc:
            raise ContinueIteration() from exc

        if token and self.check_permission(token, client, request):
            return token

    def create_introspection_payload(self, token):
        if not token:
            return {"active": False}

        try:
            token.validate()
        except ExpiredTokenError:
            return {"active": False}
        except InvalidClaimError as exc:
            if exc.claim_name == "iss":
                raise ContinueIteration() from exc
            raise InvalidTokenError() from exc

        payload = {
            "active": True,
            "token_type": "Bearer",
            "client_id": token["client_id"],
            "scope": token["scope"],
            "sub": token["sub"],
            "aud": token["aud"],
            "iss": token["iss"],
            "exp": token["exp"],
            "iat": token["iat"],
        }

        if username := self.get_username(token["sub"]):
            payload["username"] = username

        return payload

    def get_jwks(self):
        """Return the JWKs that will be used to check the JWT access token signature.
        Developers MUST re-implement this method::

            def get_jwks(self):
                return load_jwks("jwks.json")
        """
        raise NotImplementedError()

    def get_username(self, user_id: str) -> str:
        """Returns an username from a user ID.
        Developers MAY re-implement this method::

            def get_username(self, user_id):
                return User.get(id=user_id).username
        """
        return None