File: cache.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (88 lines) | stat: -rw-r--r-- 3,012 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from authlib.oauth1 import TemporaryCredential


def register_temporary_credential_hooks(
    authorization_server, cache, key_prefix="temporary_credential:"
):
    """Register temporary credential related hooks to authorization server.

    :param authorization_server: AuthorizationServer instance
    :param cache: Cache instance
    :param key_prefix: key prefix for temporary credential
    """

    def create_temporary_credential(token, client_id, redirect_uri):
        key = key_prefix + token["oauth_token"]
        token["client_id"] = client_id
        if redirect_uri:
            token["oauth_callback"] = redirect_uri

        cache.set(key, token, timeout=86400)  # cache for one day
        return TemporaryCredential(token)

    def get_temporary_credential(oauth_token):
        if not oauth_token:
            return None
        key = key_prefix + oauth_token
        value = cache.get(key)
        if value:
            return TemporaryCredential(value)

    def delete_temporary_credential(oauth_token):
        if oauth_token:
            key = key_prefix + oauth_token
            cache.delete(key)

    def create_authorization_verifier(credential, grant_user, verifier):
        key = key_prefix + credential.get_oauth_token()
        credential["oauth_verifier"] = verifier
        credential["user_id"] = grant_user.get_user_id()
        cache.set(key, credential, timeout=86400)
        return credential

    authorization_server.register_hook(
        "create_temporary_credential", create_temporary_credential
    )
    authorization_server.register_hook(
        "get_temporary_credential", get_temporary_credential
    )
    authorization_server.register_hook(
        "delete_temporary_credential", delete_temporary_credential
    )
    authorization_server.register_hook(
        "create_authorization_verifier", create_authorization_verifier
    )


def create_exists_nonce_func(cache, key_prefix="nonce:", expires=86400):
    """Create an ``exists_nonce`` function that can be used in hooks and
    resource protector.

    :param cache: Cache instance
    :param key_prefix: key prefix for temporary credential
    :param expires: Expire time for nonce
    """

    def exists_nonce(nonce, timestamp, client_id, oauth_token):
        key = f"{key_prefix}{nonce}-{timestamp}-{client_id}"
        if oauth_token:
            key = f"{key}-{oauth_token}"
        rv = cache.has(key)
        cache.set(key, 1, timeout=expires)
        return rv

    return exists_nonce


def register_nonce_hooks(
    authorization_server, cache, key_prefix="nonce:", expires=86400
):
    """Register nonce related hooks to authorization server.

    :param authorization_server: AuthorizationServer instance
    :param cache: Cache instance
    :param key_prefix: key prefix for temporary credential
    :param expires: Expire time for nonce
    """
    exists_nonce = create_exists_nonce_func(cache, key_prefix, expires)
    authorization_server.register_hook("exists_nonce", exists_nonce)