File: test_resource_protector.py

package info (click to toggle)
python-authlib 1.6.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,024 kB
  • sloc: python: 27,412; makefile: 53; sh: 14
file content (124 lines) | stat: -rw-r--r-- 3,902 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json

import pytest
from django.http import JsonResponse

from authlib.integrations.django_oauth2 import BearerTokenValidator
from authlib.integrations.django_oauth2 import ResourceProtector

from .models import Client
from .models import OAuth2Token

require_oauth = ResourceProtector()
require_oauth.register_token_validator(BearerTokenValidator(OAuth2Token))


@pytest.fixture(autouse=True)
def client(user):
    client = Client(
        user_id=user.pk,
        client_id="client-id",
        client_secret="client-secret",
        scope="profile",
    )
    client.save()
    yield client
    client.delete()


def test_invalid_token(factory):
    @require_oauth("profile")
    def get_user_profile(request):
        user = request.oauth_token.user
        return JsonResponse(dict(sub=user.pk, username=user.username))

    request = factory.get("/user")
    resp = get_user_profile(request)
    assert resp.status_code == 401
    data = json.loads(resp.content)
    assert data["error"] == "missing_authorization"

    request = factory.get("/user", HTTP_AUTHORIZATION="invalid token")
    resp = get_user_profile(request)
    assert resp.status_code == 401
    data = json.loads(resp.content)
    assert data["error"] == "unsupported_token_type"

    request = factory.get("/user", HTTP_AUTHORIZATION="bearer token")
    resp = get_user_profile(request)
    assert resp.status_code == 401
    data = json.loads(resp.content)
    assert data["error"] == "invalid_token"


def test_expired_token(factory, token):
    token.expires_in = -10
    token.save()

    @require_oauth("profile")
    def get_user_profile(request):
        user = request.oauth_token.user
        return JsonResponse(dict(sub=user.pk, username=user.username))

    request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
    resp = get_user_profile(request)
    assert resp.status_code == 401
    data = json.loads(resp.content)
    assert data["error"] == "invalid_token"


def test_insufficient_token(factory, token):
    @require_oauth("email")
    def get_user_email(request):
        user = request.oauth_token.user
        return JsonResponse(dict(email=user.email))

    request = factory.get("/user/email", HTTP_AUTHORIZATION="bearer a1")
    resp = get_user_email(request)
    assert resp.status_code == 403
    data = json.loads(resp.content)
    assert data["error"] == "insufficient_scope"


def test_access_resource(factory, token):
    @require_oauth("profile", optional=True)
    def get_user_profile(request):
        if request.oauth_token:
            user = request.oauth_token.user
            return JsonResponse(dict(sub=user.pk, username=user.username))
        return JsonResponse(dict(sub=0, username="anonymous"))

    request = factory.get("/user")
    resp = get_user_profile(request)
    assert resp.status_code == 200
    data = json.loads(resp.content)
    assert data["username"] == "anonymous"

    request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
    resp = get_user_profile(request)
    assert resp.status_code == 200
    data = json.loads(resp.content)
    assert data["username"] == "foo"


def test_scope_operator(factory, token):
    @require_oauth(["profile email"])
    def operator_and(request):
        user = request.oauth_token.user
        return JsonResponse(dict(sub=user.pk, username=user.username))

    @require_oauth(["profile", "email"])
    def operator_or(request):
        user = request.oauth_token.user
        return JsonResponse(dict(sub=user.pk, username=user.username))

    request = factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
    resp = operator_and(request)
    assert resp.status_code == 403
    data = json.loads(resp.content)
    assert data["error"] == "insufficient_scope"

    resp = operator_or(request)
    assert resp.status_code == 200
    data = json.loads(resp.content)
    assert data["username"] == "foo"