1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
import hashlib
from collections import OrderedDict
from authlib.common.encoding import json_dumps
from authlib.common.encoding import to_bytes
from authlib.common.encoding import to_unicode
from authlib.common.encoding import urlsafe_b64encode
from ..errors import InvalidUseError
class Key:
"""This is the base class for a JSON Web Key."""
kty = "_"
ALLOWED_PARAMS = ["use", "key_ops", "alg", "kid", "x5u", "x5c", "x5t", "x5t#S256"]
PRIVATE_KEY_OPS = [
"sign",
"decrypt",
"unwrapKey",
]
PUBLIC_KEY_OPS = [
"verify",
"encrypt",
"wrapKey",
]
REQUIRED_JSON_FIELDS = []
def __init__(self, options=None):
self.options = options or {}
self._dict_data = {}
@property
def tokens(self):
if not self._dict_data:
self.load_dict_key()
rv = dict(self._dict_data)
rv["kty"] = self.kty
for k in self.ALLOWED_PARAMS:
if k not in rv and k in self.options:
rv[k] = self.options[k]
return rv
@property
def kid(self):
return self.tokens.get("kid")
def keys(self):
return self.tokens.keys()
def __getitem__(self, item):
return self.tokens[item]
@property
def public_only(self):
raise NotImplementedError()
def load_raw_key(self):
raise NotImplementedError()
def load_dict_key(self):
raise NotImplementedError()
def check_key_op(self, operation):
"""Check if the given key_op is supported by this key.
:param operation: key operation value, such as "sign", "encrypt".
:raise: ValueError
"""
key_ops = self.tokens.get("key_ops")
if key_ops is not None and operation not in key_ops:
raise ValueError(f'Unsupported key_op "{operation}"')
if operation in self.PRIVATE_KEY_OPS and self.public_only:
raise ValueError(f'Invalid key_op "{operation}" for public key')
use = self.tokens.get("use")
if use:
if operation in ["sign", "verify"]:
if use != "sig":
raise InvalidUseError()
elif operation in ["decrypt", "encrypt", "wrapKey", "unwrapKey"]:
if use != "enc":
raise InvalidUseError()
def as_dict(self, is_private=False, **params):
raise NotImplementedError()
def as_json(self, is_private=False, **params):
"""Represent this key as a JSON string."""
obj = self.as_dict(is_private, **params)
return json_dumps(obj)
def thumbprint(self):
"""Implementation of RFC7638 JSON Web Key (JWK) Thumbprint."""
fields = list(self.REQUIRED_JSON_FIELDS)
fields.append("kty")
fields.sort()
data = OrderedDict()
for k in fields:
data[k] = self.tokens[k]
json_data = json_dumps(data)
digest_data = hashlib.sha256(to_bytes(json_data)).digest()
return to_unicode(urlsafe_b64encode(digest_data))
@classmethod
def check_required_fields(cls, data):
for k in cls.REQUIRED_JSON_FIELDS:
if k not in data:
raise ValueError(f'Missing required field: "{k}"')
@classmethod
def validate_raw_key(cls, key):
raise NotImplementedError()
|