1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
from authlib.consts import default_json_headers
from ..errors import InvalidRequestError
from ..hooks import Hookable
from ..hooks import hooked
from ..requests import OAuth2Request
class BaseGrant(Hookable):
#: Allowed client auth methods for token endpoint
TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic"]
#: Designed for which "grant_type"
GRANT_TYPE = None
# NOTE: there is no charset for application/json, since
# application/json should always in UTF-8.
# The example on RFC is incorrect.
# https://tools.ietf.org/html/rfc4627
TOKEN_RESPONSE_HEADER = default_json_headers
def __init__(self, request: OAuth2Request, server):
super().__init__()
self.prompt = None
self.redirect_uri = None
self.request = request
self.server = server
@property
def client(self):
return self.request.client
def generate_token(
self,
user=None,
scope=None,
grant_type=None,
expires_in=None,
include_refresh_token=True,
):
if grant_type is None:
grant_type = self.GRANT_TYPE
return self.server.generate_token(
client=self.request.client,
grant_type=grant_type,
user=user,
scope=scope,
expires_in=expires_in,
include_refresh_token=include_refresh_token,
)
def authenticate_token_endpoint_client(self):
"""Authenticate client with the given methods for token endpoint.
For example, the client makes the following HTTP request using TLS:
.. code-block:: http
POST /token HTTP/1.1
Host: server.example.com
Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
Content-Type: application/x-www-form-urlencoded
grant_type=authorization_code&code=SplxlOBeZQQYbYS6WxSbIA
&redirect_uri=https%3A%2F%2Fclient%2Eexample%2Ecom%2Fcb
Default available methods are: "none", "client_secret_basic" and
"client_secret_post".
:return: client
"""
client = self.server.authenticate_client(
self.request, self.TOKEN_ENDPOINT_AUTH_METHODS
)
self.server.send_signal("after_authenticate_client", client=client, grant=self)
return client
def save_token(self, token):
"""A method to save token into database."""
return self.server.save_token(token, self.request)
def validate_requested_scope(self):
"""Validate if requested scope is supported by Authorization Server."""
scope = self.request.payload.scope
return self.server.validate_requested_scope(scope)
class TokenEndpointMixin:
#: Allowed HTTP methods of this token endpoint
TOKEN_ENDPOINT_HTTP_METHODS = ["POST"]
#: Designed for which "grant_type"
GRANT_TYPE = None
@classmethod
def check_token_endpoint(cls, request: OAuth2Request):
return (
request.payload.grant_type == cls.GRANT_TYPE
and request.method in cls.TOKEN_ENDPOINT_HTTP_METHODS
)
def validate_token_request(self):
raise NotImplementedError()
def create_token_response(self):
raise NotImplementedError()
class AuthorizationEndpointMixin:
RESPONSE_TYPES = set()
ERROR_RESPONSE_FRAGMENT = False
@classmethod
def check_authorization_endpoint(cls, request: OAuth2Request):
return request.payload.response_type in cls.RESPONSE_TYPES
@staticmethod
def validate_authorization_redirect_uri(request: OAuth2Request, client):
if request.payload.redirect_uri:
if not client.check_redirect_uri(request.payload.redirect_uri):
raise InvalidRequestError(
f"Redirect URI {request.payload.redirect_uri} is not supported by client.",
)
return request.payload.redirect_uri
else:
redirect_uri = client.get_default_redirect_uri()
if not redirect_uri:
raise InvalidRequestError(
"Missing 'redirect_uri' in request.", state=request.payload.state
)
return redirect_uri
@staticmethod
def validate_no_multiple_request_parameter(request: OAuth2Request):
"""For the Authorization Endpoint, request and response parameters MUST NOT be included
more than once. Per `Section 3.1`_.
.. _`Section 3.1`: https://tools.ietf.org/html/rfc6749#section-3.1
"""
datalist = request.payload.datalist
parameters = ["response_type", "client_id", "redirect_uri", "scope", "state"]
for param in parameters:
if len(datalist.get(param, [])) > 1:
raise InvalidRequestError(
f"Multiple '{param}' in request.", state=request.payload.state
)
@hooked
def validate_consent_request(self):
redirect_uri = self.validate_authorization_request()
self.redirect_uri = redirect_uri
return redirect_uri
def validate_authorization_request(self):
raise NotImplementedError()
def create_authorization_response(self, redirect_uri: str, grant_user):
raise NotImplementedError()
|