File: resource_protector.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (148 lines) | stat: -rw-r--r-- 5,404 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""authlib.oauth2.rfc6749.resource_protector.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Implementation of Accessing Protected Resources per `Section 7`_.

.. _`Section 7`: https://tools.ietf.org/html/rfc6749#section-7
"""

from .errors import MissingAuthorizationError
from .errors import UnsupportedTokenTypeError
from .util import scope_to_list


class TokenValidator:
    """Base token validator class. Subclass this validator to register
    into ResourceProtector instance.
    """

    TOKEN_TYPE = "bearer"

    def __init__(self, realm=None, **extra_attributes):
        self.realm = realm
        self.extra_attributes = extra_attributes

    @staticmethod
    def scope_insufficient(token_scopes, required_scopes):
        if not required_scopes:
            return False

        token_scopes = scope_to_list(token_scopes)
        if not token_scopes:
            return True

        token_scopes = set(token_scopes)
        for scope in required_scopes:
            resource_scopes = set(scope_to_list(scope))
            if token_scopes.issuperset(resource_scopes):
                return False

        return True

    def authenticate_token(self, token_string):
        """A method to query token from database with the given token string.
        Developers MUST re-implement this method. For instance::

            def authenticate_token(self, token_string):
                return get_token_from_database(token_string)

        :param token_string: A string to represent the access_token.
        :return: token
        """
        raise NotImplementedError()

    def validate_request(self, request):
        """A method to validate if the HTTP request is valid or not. Developers MUST
        re-implement this method.  For instance, your server requires a
        "X-Device-Version" in the header::

            def validate_request(self, request):
                if "X-Device-Version" not in request.headers:
                    raise InvalidRequestError()

        Usually, you don't have to detect if the request is valid or not. If you have
        to, you MUST re-implement this method.

        :param request: instance of HttpRequest
        :raise: InvalidRequestError
        """

    def validate_token(self, token, scopes, request):
        """A method to validate if the authorized token is valid, if it has the
        permission on the given scopes. Developers MUST re-implement this method.
        e.g, check if token is expired, revoked::

            def validate_token(self, token, scopes, request):
                if not token:
                    raise InvalidTokenError()
                if token.is_expired() or token.is_revoked():
                    raise InvalidTokenError()
                if not match_token_scopes(token, scopes):
                    raise InsufficientScopeError()
        """
        raise NotImplementedError()


class ResourceProtector:
    def __init__(self):
        self._token_validators = {}
        self._default_realm = None
        self._default_auth_type = None

    def register_token_validator(self, validator: TokenValidator):
        """Register a token validator for a given Authorization type.
        Authlib has a built-in BearerTokenValidator per rfc6750.
        """
        if not self._default_auth_type:
            self._default_realm = validator.realm
            self._default_auth_type = validator.TOKEN_TYPE

        if validator.TOKEN_TYPE not in self._token_validators:
            self._token_validators[validator.TOKEN_TYPE] = validator

    def get_token_validator(self, token_type):
        """Get token validator from registry for the given token type."""
        validator = self._token_validators.get(token_type.lower())
        if not validator:
            raise UnsupportedTokenTypeError(
                self._default_auth_type, self._default_realm
            )
        return validator

    def parse_request_authorization(self, request):
        """Parse the token and token validator from request Authorization header.
        Here is an example of Authorization header::

            Authorization: Bearer a-token-string

        This method will parse this header, if it can find the validator for
        ``Bearer``, it will return the validator and ``a-token-string``.

        :return: validator, token_string
        :raise: MissingAuthorizationError
        :raise: UnsupportedTokenTypeError
        """
        auth = request.headers.get("Authorization")
        if not auth:
            raise MissingAuthorizationError(
                self._default_auth_type, self._default_realm
            )

        # https://tools.ietf.org/html/rfc6749#section-7.1
        token_parts = auth.split(None, 1)
        if len(token_parts) != 2:
            raise UnsupportedTokenTypeError(
                self._default_auth_type, self._default_realm
            )

        token_type, token_string = token_parts
        validator = self.get_token_validator(token_type)
        return validator, token_string

    def validate_request(self, scopes, request, **kwargs):
        """Validate the request and return a token."""
        validator, token_string = self.parse_request_authorization(request)
        validator.validate_request(request)
        token = validator.authenticate_token(token_string)
        validator.validate_token(token, scopes, request, **kwargs)
        return token