1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
import logging
from authlib.jose import JoseError
from authlib.jose import jwt
from ..rfc6749 import BaseGrant
from ..rfc6749 import InvalidClientError
from ..rfc6749 import InvalidGrantError
from ..rfc6749 import InvalidRequestError
from ..rfc6749 import TokenEndpointMixin
from ..rfc6749 import UnauthorizedClientError
from .assertion import sign_jwt_bearer_assertion
log = logging.getLogger(__name__)
JWT_BEARER_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer"
class JWTBearerGrant(BaseGrant, TokenEndpointMixin):
GRANT_TYPE = JWT_BEARER_GRANT_TYPE
#: Options for verifying JWT payload claims. Developers MAY
#: overwrite this constant to create a more strict options.
CLAIMS_OPTIONS = {
"iss": {"essential": True},
"aud": {"essential": True},
"exp": {"essential": True},
}
# A small allowance of time, typically no more than a few minutes,
# to account for clock skew. The default is 60 seconds.
LEEWAY = 60
@staticmethod
def sign(
key,
issuer,
audience,
subject=None,
issued_at=None,
expires_at=None,
claims=None,
**kwargs,
):
return sign_jwt_bearer_assertion(
key, issuer, audience, subject, issued_at, expires_at, claims, **kwargs
)
def process_assertion_claims(self, assertion):
"""Extract JWT payload claims from request "assertion", per
`Section 3.1`_.
:param assertion: assertion string value in the request
:return: JWTClaims
:raise: InvalidGrantError
.. _`Section 3.1`: https://tools.ietf.org/html/rfc7523#section-3.1
"""
try:
claims = jwt.decode(
assertion, self.resolve_public_key, claims_options=self.CLAIMS_OPTIONS
)
claims.validate(leeway=self.LEEWAY)
except JoseError as e:
log.debug("Assertion Error: %r", e)
raise InvalidGrantError(description=e.description) from e
return claims
def resolve_public_key(self, headers, payload):
client = self.resolve_issuer_client(payload["iss"])
return self.resolve_client_key(client, headers, payload)
def validate_token_request(self):
"""The client makes a request to the token endpoint by sending the
following parameters using the "application/x-www-form-urlencoded"
format per `Section 2.1`_:
grant_type
REQUIRED. Value MUST be set to
"urn:ietf:params:oauth:grant-type:jwt-bearer".
assertion
REQUIRED. Value MUST contain a single JWT.
scope
OPTIONAL.
The following example demonstrates an access token request with a JWT
as an authorization grant:
.. code-block:: http
POST /token.oauth2 HTTP/1.1
Host: as.example.com
Content-Type: application/x-www-form-urlencoded
grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer
&assertion=eyJhbGciOiJFUzI1NiIsImtpZCI6IjE2In0.
eyJpc3Mi[...omitted for brevity...].
J9l-ZhwP[...omitted for brevity...]
.. _`Section 2.1`: https://tools.ietf.org/html/rfc7523#section-2.1
"""
assertion = self.request.form.get("assertion")
if not assertion:
raise InvalidRequestError("Missing 'assertion' in request")
claims = self.process_assertion_claims(assertion)
client = self.resolve_issuer_client(claims["iss"])
log.debug("Validate token request of %s", client)
if not client.check_grant_type(self.GRANT_TYPE):
raise UnauthorizedClientError(
f"The client is not authorized to use 'grant_type={self.GRANT_TYPE}'"
)
self.request.client = client
self.validate_requested_scope()
subject = claims.get("sub")
if subject:
user = self.authenticate_user(subject)
if not user:
raise InvalidGrantError(description="Invalid 'sub' value in assertion")
log.debug("Check client(%s) permission to User(%s)", client, user)
if not self.has_granted_permission(client, user):
raise InvalidClientError(
description="Client has no permission to access user data"
)
self.request.user = user
def create_token_response(self):
"""If valid and authorized, the authorization server issues an access
token.
"""
token = self.generate_token(
scope=self.request.payload.scope,
user=self.request.user,
include_refresh_token=False,
)
log.debug("Issue token %r to %r", token, self.request.client)
self.save_token(token)
return 200, token, self.TOKEN_RESPONSE_HEADER
def resolve_issuer_client(self, issuer):
"""Fetch client via "iss" in assertion claims. Developers MUST
implement this method in subclass, e.g.::
def resolve_issuer_client(self, issuer):
return Client.query_by_iss(issuer)
:param issuer: "iss" value in assertion
:return: Client instance
"""
raise NotImplementedError()
def resolve_client_key(self, client, headers, payload):
"""Resolve client key to decode assertion data. Developers MUST
implement this method in subclass. For instance, there is a
"jwks" column on client table, e.g.::
def resolve_client_key(self, client, headers, payload):
# from authlib.jose import JsonWebKey
key_set = JsonWebKey.import_key_set(client.jwks)
return key_set.find_by_kid(headers["kid"])
:param client: instance of OAuth client model
:param headers: headers part of the JWT
:param payload: payload part of the JWT
:return: ``authlib.jose.Key`` instance
"""
raise NotImplementedError()
def authenticate_user(self, subject):
"""Authenticate user with the given assertion claims. Developers MUST
implement it in subclass, e.g.::
def authenticate_user(self, subject):
return User.get_by_sub(subject)
:param subject: "sub" value in claims
:return: User instance
"""
raise NotImplementedError()
def has_granted_permission(self, client, user):
"""Check if the client has permission to access the given user's resource.
Developers MUST implement it in subclass, e.g.::
def has_granted_permission(self, client, user):
permission = ClientUserGrant.query(client=client, user=user)
return permission.granted
:param client: instance of OAuth client model
:param user: instance of User model
:return: bool
"""
raise NotImplementedError()
|