1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
import logging
from django.core.exceptions import ImproperlyConfigured
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import SAFE_METHODS, BasePermission, IsAuthenticated
from ...settings import oauth2_settings
from .authentication import OAuth2Authentication
log = logging.getLogger("oauth2_provider")
class TokenHasScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""
def has_permission(self, request, view):
token = request.auth
if not token:
return False
if hasattr(token, "scope"): # OAuth 2
required_scopes = self.get_scopes(request, view)
log.debug("Required scopes to access resource: {0}".format(required_scopes))
if token.is_valid(required_scopes):
return True
# Provide information about required scope?
include_required_scope = (
oauth2_settings.ERROR_RESPONSE_WITH_SCOPES
and required_scopes
and not token.is_expired()
and not token.allow_scopes(required_scopes)
)
if include_required_scope:
self.message = {
"detail": PermissionDenied.default_detail,
"required_scopes": list(required_scopes),
}
return False
assert False, (
"TokenHasScope requires the"
"`oauth2_provider.rest_framework.OAuth2Authentication` authentication "
"class to be used."
)
def get_scopes(self, request, view):
try:
return getattr(view, "required_scopes")
except AttributeError:
raise ImproperlyConfigured(
"TokenHasScope requires the view to define the required_scopes attribute"
)
class TokenHasReadWriteScope(TokenHasScope):
"""
The request is authenticated as a user and the token used has the right scope
"""
def get_scopes(self, request, view):
try:
required_scopes = super().get_scopes(request, view)
except ImproperlyConfigured:
required_scopes = []
# TODO: code duplication!! see dispatch in ReadWriteScopedResourceMixin
if request.method.upper() in SAFE_METHODS:
read_write_scope = oauth2_settings.READ_SCOPE
else:
read_write_scope = oauth2_settings.WRITE_SCOPE
return required_scopes + [read_write_scope]
class TokenHasResourceScope(TokenHasScope):
"""
The request is authenticated as a user and the token used has the right scope
"""
def get_scopes(self, request, view):
try:
view_scopes = super().get_scopes(request, view)
except ImproperlyConfigured:
view_scopes = []
if request.method.upper() in SAFE_METHODS:
scope_type = oauth2_settings.READ_SCOPE
else:
scope_type = oauth2_settings.WRITE_SCOPE
required_scopes = ["{}:{}".format(scope, scope_type) for scope in view_scopes]
return required_scopes
class IsAuthenticatedOrTokenHasScope(BasePermission):
"""
The user is authenticated using some backend or the token has the right scope
This only returns True if the user is authenticated, but not using a token
or using a token, and the token has the correct scope.
This is useful when combined with the DjangoModelPermissions to allow people browse
the browsable api's if they log in using the a non token bassed middleware,
and let them access the api's using a rest client with a token
"""
def has_permission(self, request, view):
is_authenticated = IsAuthenticated().has_permission(request, view)
oauth2authenticated = False
if is_authenticated:
oauth2authenticated = isinstance(request.successful_authenticator, OAuth2Authentication)
token_has_scope = TokenHasScope()
return (is_authenticated and not oauth2authenticated) or token_has_scope.has_permission(request, view)
class TokenMatchesOASRequirements(BasePermission):
"""
:attr:alternate_required_scopes: dict keyed by HTTP method name with value: iterable alternate scope lists
This fulfills the [Open API Specification (OAS; formerly Swagger)](https://www.openapis.org/)
list of alternative Security Requirements Objects for oauth2 or openIdConnect:
When a list of Security Requirement Objects is defined on the Open API object or Operation Object,
only one of Security Requirement Objects in the list needs to be satisfied to authorize the request.
[1](https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.0.md#securityRequirementObject)
For each method, a list of lists of allowed scopes is tried in order and the first to match succeeds.
@example
required_alternate_scopes = {
'GET': [['read']],
'POST': [['create1','scope2'], ['alt-scope3'], ['alt-scope4','alt-scope5']],
}
TODO: DRY: subclass TokenHasScope and iterate over values of required_scope?
"""
def has_permission(self, request, view):
token = request.auth
if not token:
return False
if hasattr(token, "scope"): # OAuth 2
required_alternate_scopes = self.get_required_alternate_scopes(request, view)
m = request.method.upper()
if m in required_alternate_scopes:
log.debug(
"Required scopes alternatives to access resource: {0}".format(
required_alternate_scopes[m]
)
)
for alt in required_alternate_scopes[m]:
if token.is_valid(alt):
return True
return False
else:
log.warning("no scope alternates defined for method {0}".format(m))
return False
assert False, (
"TokenMatchesOASRequirements requires the"
"`oauth2_provider.rest_framework.OAuth2Authentication` authentication "
"class to be used."
)
def get_required_alternate_scopes(self, request, view):
try:
return getattr(view, "required_alternate_scopes")
except AttributeError:
raise ImproperlyConfigured(
"TokenMatchesOASRequirements requires the view to"
" define the required_alternate_scopes attribute"
)
|