File: test_user_mixin.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (147 lines) | stat: -rw-r--r-- 3,956 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import pytest
from httpx import ASGITransport
from starlette.requests import Request

from authlib.integrations.starlette_client import OAuth
from authlib.jose import JsonWebKey
from authlib.jose.errors import InvalidClaimError
from authlib.oidc.core.grants.util import generate_id_token

from ..asgi_helper import AsyncPathMapDispatch
from ..util import get_bearer_token
from ..util import read_key_file

secret_key = JsonWebKey.import_key("secret", {"kty": "oct", "kid": "f"})


async def run_fetch_userinfo(payload):
    oauth = OAuth()

    async def fetch_token(request):
        return get_bearer_token()

    transport = ASGITransport(AsyncPathMapDispatch({"/userinfo": {"body": payload}}))

    client = oauth.register(
        "dev",
        client_id="dev",
        client_secret="dev",
        fetch_token=fetch_token,
        userinfo_endpoint="https://i.b/userinfo",
        client_kwargs={
            "transport": transport,
        },
    )

    req_scope = {"type": "http", "session": {}}
    req = Request(req_scope)
    user = await client.userinfo(request=req)
    assert user.sub == "123"


@pytest.mark.asyncio
async def test_fetch_userinfo():
    await run_fetch_userinfo({"sub": "123"})


@pytest.mark.asyncio
async def test_parse_id_token():
    token = get_bearer_token()
    id_token = generate_id_token(
        token,
        {"sub": "123"},
        secret_key,
        alg="HS256",
        iss="https://i.b",
        aud="dev",
        exp=3600,
        nonce="n",
    )
    token["id_token"] = id_token

    oauth = OAuth()
    client = oauth.register(
        "dev",
        client_id="dev",
        client_secret="dev",
        fetch_token=get_bearer_token,
        jwks={"keys": [secret_key.as_dict()]},
        issuer="https://i.b",
        id_token_signing_alg_values_supported=["HS256", "RS256"],
    )
    user = await client.parse_id_token(token, nonce="n")
    assert user.sub == "123"

    claims_options = {"iss": {"value": "https://i.b"}}
    user = await client.parse_id_token(token, nonce="n", claims_options=claims_options)
    assert user.sub == "123"

    with pytest.raises(InvalidClaimError):
        claims_options = {"iss": {"value": "https://i.c"}}
        await client.parse_id_token(token, nonce="n", claims_options=claims_options)


@pytest.mark.asyncio
async def test_runtime_error_fetch_jwks_uri():
    token = get_bearer_token()
    id_token = generate_id_token(
        token,
        {"sub": "123"},
        secret_key,
        alg="HS256",
        iss="https://i.b",
        aud="dev",
        exp=3600,
        nonce="n",
    )

    oauth = OAuth()
    client = oauth.register(
        "dev",
        client_id="dev",
        client_secret="dev",
        fetch_token=get_bearer_token,
        issuer="https://i.b",
        id_token_signing_alg_values_supported=["HS256"],
    )
    req_scope = {"type": "http", "session": {"_dev_authlib_nonce_": "n"}}
    req = Request(req_scope)
    token["id_token"] = id_token
    with pytest.raises(RuntimeError):
        await client.parse_id_token(req, token)


@pytest.mark.asyncio
async def test_force_fetch_jwks_uri():
    secret_keys = read_key_file("jwks_private.json")
    token = get_bearer_token()
    id_token = generate_id_token(
        token,
        {"sub": "123"},
        secret_keys,
        alg="RS256",
        iss="https://i.b",
        aud="dev",
        exp=3600,
        nonce="n",
    )
    token["id_token"] = id_token

    transport = ASGITransport(
        AsyncPathMapDispatch({"/jwks": {"body": read_key_file("jwks_public.json")}})
    )

    oauth = OAuth()
    client = oauth.register(
        "dev",
        client_id="dev",
        client_secret="dev",
        fetch_token=get_bearer_token,
        jwks_uri="https://i.b/jwks",
        issuer="https://i.b",
        client_kwargs={
            "transport": transport,
        },
    )
    user = await client.parse_id_token(token, nonce="n")
    assert user.sub == "123"