1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
import json
from django.http import JsonResponse
from authlib.integrations.django_oauth2 import BearerTokenValidator
from authlib.integrations.django_oauth2 import ResourceProtector
from .models import Client
from .models import OAuth2Token
from .models import User
from .oauth2_server import TestCase
require_oauth = ResourceProtector()
require_oauth.register_token_validator(BearerTokenValidator(OAuth2Token))
class ResourceProtectorTest(TestCase):
def prepare_data(self, expires_in=3600, scope="profile"):
user = User(username="foo")
user.save()
client = Client(
user_id=user.pk,
client_id="client",
client_secret="secret",
scope="profile",
)
client.save()
token = OAuth2Token(
user_id=user.pk,
client_id=client.client_id,
token_type="bearer",
access_token="a1",
scope=scope,
expires_in=expires_in,
)
token.save()
def test_invalid_token(self):
@require_oauth("profile")
def get_user_profile(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
self.prepare_data()
request = self.factory.get("/user")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "missing_authorization"
request = self.factory.get("/user", HTTP_AUTHORIZATION="invalid token")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "unsupported_token_type"
request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer token")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "invalid_token"
def test_expired_token(self):
self.prepare_data(-10)
@require_oauth("profile")
def get_user_profile(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "invalid_token"
def test_insufficient_token(self):
self.prepare_data()
@require_oauth("email")
def get_user_email(request):
user = request.oauth_token.user
return JsonResponse(dict(email=user.email))
request = self.factory.get("/user/email", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_email(request)
assert resp.status_code == 403
data = json.loads(resp.content)
assert data["error"] == "insufficient_scope"
def test_access_resource(self):
self.prepare_data()
@require_oauth("profile", optional=True)
def get_user_profile(request):
if request.oauth_token:
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
return JsonResponse(dict(sub=0, username="anonymous"))
request = self.factory.get("/user")
resp = get_user_profile(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "anonymous"
request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_profile(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "foo"
def test_scope_operator(self):
self.prepare_data()
@require_oauth(["profile email"])
def operator_and(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
@require_oauth(["profile", "email"])
def operator_or(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = operator_and(request)
assert resp.status_code == 403
data = json.loads(resp.content)
assert data["error"] == "insufficient_scope"
resp = operator_or(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "foo"
|