1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
from typing import Optional, TypeVar
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser
from django.utils.translation import gettext_lazy as _
from rest_framework import HTTP_HEADER_ENCODING, authentication
from rest_framework.request import Request
from .exceptions import AuthenticationFailed, InvalidToken, TokenError
from .models import TokenUser
from .settings import api_settings
from .tokens import Token
from .utils import get_md5_hash_password
AUTH_HEADER_TYPES = api_settings.AUTH_HEADER_TYPES
if not isinstance(api_settings.AUTH_HEADER_TYPES, (list, tuple)):
AUTH_HEADER_TYPES = (AUTH_HEADER_TYPES,)
AUTH_HEADER_TYPE_BYTES: set[bytes] = {
h.encode(HTTP_HEADER_ENCODING) for h in AUTH_HEADER_TYPES
}
AuthUser = TypeVar("AuthUser", AbstractBaseUser, TokenUser)
class JWTAuthentication(authentication.BaseAuthentication):
"""
An authentication plugin that authenticates requests through a JSON web
token provided in a request header.
"""
www_authenticate_realm = "api"
media_type = "application/json"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.user_model = get_user_model()
def authenticate(self, request: Request) -> Optional[tuple[AuthUser, Token]]:
header = self.get_header(request)
if header is None:
return None
raw_token = self.get_raw_token(header)
if raw_token is None:
return None
validated_token = self.get_validated_token(raw_token)
return self.get_user(validated_token), validated_token
def authenticate_header(self, request: Request) -> str:
return '{} realm="{}"'.format(
AUTH_HEADER_TYPES[0],
self.www_authenticate_realm,
)
def get_header(self, request: Request) -> bytes:
"""
Extracts the header containing the JSON web token from the given
request.
"""
header = request.META.get(api_settings.AUTH_HEADER_NAME)
if isinstance(header, str):
# Work around django test client oddness
header = header.encode(HTTP_HEADER_ENCODING)
return header
def get_raw_token(self, header: bytes) -> Optional[bytes]:
"""
Extracts an unvalidated JSON web token from the given "Authorization"
header value.
"""
parts = header.split()
if len(parts) == 0:
# Empty AUTHORIZATION header sent
return None
if parts[0] not in AUTH_HEADER_TYPE_BYTES:
# Assume the header does not contain a JSON web token
return None
if len(parts) != 2:
raise AuthenticationFailed(
_("Authorization header must contain two space-delimited values"),
code="bad_authorization_header",
)
return parts[1]
def get_validated_token(self, raw_token: bytes) -> Token:
"""
Validates an encoded JSON web token and returns a validated token
wrapper object.
"""
messages = []
for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
try:
return AuthToken(raw_token)
except TokenError as e:
messages.append(
{
"token_class": AuthToken.__name__,
"token_type": AuthToken.token_type,
"message": e.args[0],
}
)
raise InvalidToken(
{
"detail": _("Given token not valid for any token type"),
"messages": messages,
}
)
def get_user(self, validated_token: Token) -> AuthUser:
"""
Attempts to find and return a user using the given validated token.
"""
try:
user_id = validated_token[api_settings.USER_ID_CLAIM]
except KeyError as e:
raise InvalidToken(
_("Token contained no recognizable user identification")
) from e
try:
user = self.user_model.objects.get(**{api_settings.USER_ID_FIELD: user_id})
except self.user_model.DoesNotExist as e:
raise AuthenticationFailed(
_("User not found"), code="user_not_found"
) from e
if api_settings.CHECK_USER_IS_ACTIVE and not user.is_active:
raise AuthenticationFailed(_("User is inactive"), code="user_inactive")
if api_settings.CHECK_REVOKE_TOKEN:
if validated_token.get(
api_settings.REVOKE_TOKEN_CLAIM
) != get_md5_hash_password(user.password):
raise AuthenticationFailed(
_("The user's password has been changed."), code="password_changed"
)
return user
class JWTStatelessUserAuthentication(JWTAuthentication):
"""
An authentication plugin that authenticates requests through a JSON web
token provided in a request header without performing a database lookup to obtain a user instance.
"""
def get_user(self, validated_token: Token) -> AuthUser:
"""
Returns a stateless user object which is backed by the given validated
token.
"""
if api_settings.USER_ID_CLAIM not in validated_token:
# The TokenUser class assumes tokens will have a recognizable user
# identifier claim.
raise InvalidToken(_("Token contained no recognizable user identification"))
return api_settings.TOKEN_USER_CLASS(validated_token)
JWTTokenUserAuthentication = JWTStatelessUserAuthentication
def default_user_authentication_rule(user: AuthUser) -> bool:
# Prior to Django 1.10, inactive users could be authenticated with the
# default `ModelBackend`. As of Django 1.10, the `ModelBackend`
# prevents inactive users from authenticating. App designers can still
# allow inactive users to authenticate by opting for the new
# `AllowAllUsersModelBackend`. However, we explicitly prevent inactive
# users from authenticating to enforce a reasonable policy and provide
# sensible backwards compatibility with older Django versions.
return user is not None and (
not api_settings.CHECK_USER_IS_ACTIVE or user.is_active
)
|