File: base_key.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (120 lines) | stat: -rw-r--r-- 3,358 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import hashlib
from collections import OrderedDict

from authlib.common.encoding import json_dumps
from authlib.common.encoding import to_bytes
from authlib.common.encoding import to_unicode
from authlib.common.encoding import urlsafe_b64encode

from ..errors import InvalidUseError


class Key:
    """This is the base class for a JSON Web Key."""

    kty = "_"

    ALLOWED_PARAMS = ["use", "key_ops", "alg", "kid", "x5u", "x5c", "x5t", "x5t#S256"]

    PRIVATE_KEY_OPS = [
        "sign",
        "decrypt",
        "unwrapKey",
    ]
    PUBLIC_KEY_OPS = [
        "verify",
        "encrypt",
        "wrapKey",
    ]

    REQUIRED_JSON_FIELDS = []

    def __init__(self, options=None):
        self.options = options or {}
        self._dict_data = {}

    @property
    def tokens(self):
        if not self._dict_data:
            self.load_dict_key()

        rv = dict(self._dict_data)
        rv["kty"] = self.kty
        for k in self.ALLOWED_PARAMS:
            if k not in rv and k in self.options:
                rv[k] = self.options[k]
        return rv

    @property
    def kid(self):
        return self.tokens.get("kid")

    def keys(self):
        return self.tokens.keys()

    def __getitem__(self, item):
        return self.tokens[item]

    @property
    def public_only(self):
        raise NotImplementedError()

    def load_raw_key(self):
        raise NotImplementedError()

    def load_dict_key(self):
        raise NotImplementedError()

    def check_key_op(self, operation):
        """Check if the given key_op is supported by this key.

        :param operation: key operation value, such as "sign", "encrypt".
        :raise: ValueError
        """
        key_ops = self.tokens.get("key_ops")
        if key_ops is not None and operation not in key_ops:
            raise ValueError(f'Unsupported key_op "{operation}"')

        if operation in self.PRIVATE_KEY_OPS and self.public_only:
            raise ValueError(f'Invalid key_op "{operation}" for public key')

        use = self.tokens.get("use")
        if use:
            if operation in ["sign", "verify"]:
                if use != "sig":
                    raise InvalidUseError()
            elif operation in ["decrypt", "encrypt", "wrapKey", "unwrapKey"]:
                if use != "enc":
                    raise InvalidUseError()

    def as_dict(self, is_private=False, **params):
        raise NotImplementedError()

    def as_json(self, is_private=False, **params):
        """Represent this key as a JSON string."""
        obj = self.as_dict(is_private, **params)
        return json_dumps(obj)

    def thumbprint(self):
        """Implementation of RFC7638 JSON Web Key (JWK) Thumbprint."""
        fields = list(self.REQUIRED_JSON_FIELDS)
        fields.append("kty")
        fields.sort()
        data = OrderedDict()

        for k in fields:
            data[k] = self.tokens[k]

        json_data = json_dumps(data)
        digest_data = hashlib.sha256(to_bytes(json_data)).digest()
        return to_unicode(urlsafe_b64encode(digest_data))

    @classmethod
    def check_required_fields(cls, data):
        for k in cls.REQUIRED_JSON_FIELDS:
            if k not in data:
                raise ValueError(f'Missing required field: "{k}"')

    @classmethod
    def validate_raw_key(cls, key):
        raise NotImplementedError()