1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
|
"""
Provides various authentication policies.
"""
import base64
import binascii
from django.contrib.auth import authenticate, get_user_model
from django.middleware.csrf import CsrfViewMiddleware
from django.utils.translation import gettext_lazy as _
from rest_framework import HTTP_HEADER_ENCODING, exceptions
def get_authorization_header(request):
"""
Return request's 'Authorization:' header, as a bytestring.
Hide some test client ickyness where the header can be unicode.
"""
auth = request.META.get('HTTP_AUTHORIZATION', b'')
if isinstance(auth, str):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
class CSRFCheck(CsrfViewMiddleware):
def _reject(self, request, reason):
# Return the failure reason instead of an HttpResponse
return reason
class BaseAuthentication:
"""
All authentication classes should extend BaseAuthentication.
"""
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
raise NotImplementedError(".authenticate() must be overridden.")
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
pass
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
try:
auth_decoded = base64.b64decode(auth[1]).decode('utf-8')
except UnicodeDecodeError:
auth_decoded = base64.b64decode(auth[1]).decode('latin-1')
userid, password = auth_decoded.split(':', 1)
except (TypeError, ValueError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(userid, password, request)
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""
def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""
# Get the session-based user from the underlying HttpRequest object
user = getattr(request._request, 'user', None)
# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
return None
self.enforce_csrf(request)
# CSRF passed with authenticated user
return (user, None)
def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
def dummy_get_response(request): # pragma: no cover
return None
check = CSRFCheck(dummy_get_response)
# populates request.META['CSRF_COOKIE'], which is used in process_view()
check.process_request(request)
reason = check.process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
class TokenAuthentication(BaseAuthentication):
"""
Simple token based authentication.
Clients should authenticate by passing the token key in the "Authorization"
HTTP header, prepended with the string "Token ". For example:
Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a
"""
keyword = 'Token'
model = None
def get_model(self):
if self.model is not None:
return self.model
from rest_framework.authtoken.models import Token
return Token
"""
A custom token model may be used, but must have the following properties.
* key -- The string identifying the token
* user -- The user to which the token belongs
"""
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != self.keyword.lower().encode():
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
def authenticate_credentials(self, key):
model = self.get_model()
try:
token = model.objects.select_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not token.user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (token.user, token)
def authenticate_header(self, request):
return self.keyword
class RemoteUserAuthentication(BaseAuthentication):
"""
REMOTE_USER authentication.
To use this, set up your web server to perform authentication, which will
set the REMOTE_USER environment variable. You will need to have
'django.contrib.auth.backends.RemoteUserBackend in your
AUTHENTICATION_BACKENDS setting
"""
# Name of request header to grab username from. This will be the key as
# used in the request.META dictionary, i.e. the normalization of headers to
# all uppercase and the addition of "HTTP_" prefix apply.
header = "REMOTE_USER"
def authenticate(self, request):
user = authenticate(request=request, remote_user=request.META.get(self.header))
if user and user.is_active:
return (user, None)
|