1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
import json
import pytest
from django.http import JsonResponse
from authlib.integrations.django_oauth2 import BearerTokenValidator
from authlib.integrations.django_oauth2 import ResourceProtector
from .models import Client
from .models import OAuth2Token
require_oauth = ResourceProtector()
require_oauth.register_token_validator(BearerTokenValidator(OAuth2Token))
@pytest.fixture(autouse=True)
def client(user):
client = Client(
user_id=user.pk,
client_id="client-id",
client_secret="client-secret",
scope="profile",
)
client.save()
yield client
client.delete()
def test_invalid_token(factory):
@require_oauth("profile")
def get_user_profile(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
request = factory.get("/user")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "missing_authorization"
request = factory.get("/user", HTTP_AUTHORIZATION="invalid token")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "unsupported_token_type"
request = factory.get("/user", HTTP_AUTHORIZATION="bearer token")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "invalid_token"
def test_expired_token(factory, token):
token.expires_in = -10
token.save()
@require_oauth("profile")
def get_user_profile(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_profile(request)
assert resp.status_code == 401
data = json.loads(resp.content)
assert data["error"] == "invalid_token"
def test_insufficient_token(factory, token):
@require_oauth("email")
def get_user_email(request):
user = request.oauth_token.user
return JsonResponse(dict(email=user.email))
request = factory.get("/user/email", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_email(request)
assert resp.status_code == 403
data = json.loads(resp.content)
assert data["error"] == "insufficient_scope"
def test_access_resource(factory, token):
@require_oauth("profile", optional=True)
def get_user_profile(request):
if request.oauth_token:
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
return JsonResponse(dict(sub=0, username="anonymous"))
request = factory.get("/user")
resp = get_user_profile(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "anonymous"
request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = get_user_profile(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "foo"
def test_scope_operator(factory, token):
@require_oauth(["profile email"])
def operator_and(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
@require_oauth(["profile", "email"])
def operator_or(request):
user = request.oauth_token.user
return JsonResponse(dict(sub=user.pk, username=user.username))
request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
resp = operator_and(request)
assert resp.status_code == 403
data = json.loads(resp.content)
assert data["error"] == "insufficient_scope"
resp = operator_or(request)
assert resp.status_code == 200
data = json.loads(resp.content)
assert data["username"] == "foo"
|