File: test_resource_protector.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (137 lines) | stat: -rw-r--r-- 4,688 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json

from django.http import JsonResponse

from authlib.integrations.django_oauth2 import BearerTokenValidator
from authlib.integrations.django_oauth2 import ResourceProtector

from .models import Client
from .models import OAuth2Token
from .models import User
from .oauth2_server import TestCase

require_oauth = ResourceProtector()
require_oauth.register_token_validator(BearerTokenValidator(OAuth2Token))


class ResourceProtectorTest(TestCase):
    def prepare_data(self, expires_in=3600, scope="profile"):
        user = User(username="foo")
        user.save()
        client = Client(
            user_id=user.pk,
            client_id="client",
            client_secret="secret",
            scope="profile",
        )
        client.save()

        token = OAuth2Token(
            user_id=user.pk,
            client_id=client.client_id,
            token_type="bearer",
            access_token="a1",
            scope=scope,
            expires_in=expires_in,
        )
        token.save()

    def test_invalid_token(self):
        @require_oauth("profile")
        def get_user_profile(request):
            user = request.oauth_token.user
            return JsonResponse(dict(sub=user.pk, username=user.username))

        self.prepare_data()

        request = self.factory.get("/user")
        resp = get_user_profile(request)
        assert resp.status_code == 401
        data = json.loads(resp.content)
        assert data["error"] == "missing_authorization"

        request = self.factory.get("/user", HTTP_AUTHORIZATION="invalid token")
        resp = get_user_profile(request)
        assert resp.status_code == 401
        data = json.loads(resp.content)
        assert data["error"] == "unsupported_token_type"

        request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer token")
        resp = get_user_profile(request)
        assert resp.status_code == 401
        data = json.loads(resp.content)
        assert data["error"] == "invalid_token"

    def test_expired_token(self):
        self.prepare_data(-10)

        @require_oauth("profile")
        def get_user_profile(request):
            user = request.oauth_token.user
            return JsonResponse(dict(sub=user.pk, username=user.username))

        request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
        resp = get_user_profile(request)
        assert resp.status_code == 401
        data = json.loads(resp.content)
        assert data["error"] == "invalid_token"

    def test_insufficient_token(self):
        self.prepare_data()

        @require_oauth("email")
        def get_user_email(request):
            user = request.oauth_token.user
            return JsonResponse(dict(email=user.email))

        request = self.factory.get("/user/email", HTTP_AUTHORIZATION="bearer a1")
        resp = get_user_email(request)
        assert resp.status_code == 403
        data = json.loads(resp.content)
        assert data["error"] == "insufficient_scope"

    def test_access_resource(self):
        self.prepare_data()

        @require_oauth("profile", optional=True)
        def get_user_profile(request):
            if request.oauth_token:
                user = request.oauth_token.user
                return JsonResponse(dict(sub=user.pk, username=user.username))
            return JsonResponse(dict(sub=0, username="anonymous"))

        request = self.factory.get("/user")
        resp = get_user_profile(request)
        assert resp.status_code == 200
        data = json.loads(resp.content)
        assert data["username"] == "anonymous"

        request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
        resp = get_user_profile(request)
        assert resp.status_code == 200
        data = json.loads(resp.content)
        assert data["username"] == "foo"

    def test_scope_operator(self):
        self.prepare_data()

        @require_oauth(["profile email"])
        def operator_and(request):
            user = request.oauth_token.user
            return JsonResponse(dict(sub=user.pk, username=user.username))

        @require_oauth(["profile", "email"])
        def operator_or(request):
            user = request.oauth_token.user
            return JsonResponse(dict(sub=user.pk, username=user.username))

        request = self.factory.get("/user", HTTP_AUTHORIZATION="bearer a1")
        resp = operator_and(request)
        assert resp.status_code == 403
        data = json.loads(resp.content)
        assert data["error"] == "insufficient_scope"

        resp = operator_or(request)
        assert resp.status_code == 200
        data = json.loads(resp.content)
        assert data["username"] == "foo"