1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import os
from abc import ABCMeta
class JWEAlgorithmBase(metaclass=ABCMeta): # noqa: B024
"""Base interface for all JWE algorithms."""
EXTRA_HEADERS = None
name = None
description = None
algorithm_type = "JWE"
algorithm_location = "alg"
def prepare_key(self, raw_data):
raise NotImplementedError
def generate_preset(self, enc_alg, key):
raise NotImplementedError
class JWEAlgorithm(JWEAlgorithmBase, metaclass=ABCMeta):
"""Interface for JWE algorithm conforming to RFC7518.
JWA specification (RFC7518) SHOULD implement the algorithms for JWE
with this base implementation.
"""
def wrap(self, enc_alg, headers, key, preset=None):
raise NotImplementedError
def unwrap(self, enc_alg, ek, headers, key):
raise NotImplementedError
class JWEAlgorithmWithTagAwareKeyAgreement(JWEAlgorithmBase, metaclass=ABCMeta):
"""Interface for JWE algorithm with tag-aware key agreement (in key agreement
with key wrapping mode).
ECDH-1PU is an example of such an algorithm.
"""
def generate_keys_and_prepare_headers(self, enc_alg, key, sender_key, preset=None):
raise NotImplementedError
def agree_upon_key_and_wrap_cek(
self, enc_alg, headers, key, sender_key, epk, cek, tag
):
raise NotImplementedError
def wrap(self, enc_alg, headers, key, sender_key, preset=None):
raise NotImplementedError
def unwrap(self, enc_alg, ek, headers, key, sender_key, tag=None):
raise NotImplementedError
class JWEEncAlgorithm:
name = None
description = None
algorithm_type = "JWE"
algorithm_location = "enc"
IV_SIZE = None
CEK_SIZE = None
def generate_cek(self):
return os.urandom(self.CEK_SIZE // 8)
def generate_iv(self):
return os.urandom(self.IV_SIZE // 8)
def check_iv(self, iv):
if len(iv) * 8 != self.IV_SIZE:
raise ValueError('Invalid "iv" size')
def encrypt(self, msg, aad, iv, key):
"""Encrypt the given "msg" text.
:param msg: text to be encrypt in bytes
:param aad: additional authenticated data in bytes
:param iv: initialization vector in bytes
:param key: encrypted key in bytes
:return: (ciphertext, tag)
"""
raise NotImplementedError
def decrypt(self, ciphertext, aad, iv, tag, key):
"""Decrypt the given cipher text.
:param ciphertext: ciphertext in bytes
:param aad: additional authenticated data in bytes
:param iv: initialization vector in bytes
:param tag: authentication tag in bytes
:param key: encrypted key in bytes
:return: message
"""
raise NotImplementedError
class JWEZipAlgorithm:
name = None
description = None
algorithm_type = "JWE"
algorithm_location = "zip"
def compress(self, s):
raise NotImplementedError
def decompress(self, s):
raise NotImplementedError
class JWESharedHeader(dict):
"""Shared header object for JWE.
Combines protected header and shared unprotected header together.
"""
def __init__(self, protected, unprotected):
obj = {}
if protected:
obj.update(protected)
if unprotected:
obj.update(unprotected)
super().__init__(obj)
self.protected = protected if protected else {}
self.unprotected = unprotected if unprotected else {}
def update_protected(self, addition):
self.update(addition)
self.protected.update(addition)
@classmethod
def from_dict(cls, obj):
if isinstance(obj, cls):
return obj
return cls(obj.get("protected"), obj.get("unprotected"))
class JWEHeader(dict):
"""Header object for JWE.
Combines protected header, shared unprotected header
and specific recipient's unprotected header together.
"""
def __init__(self, protected, unprotected, header):
obj = {}
if protected:
obj.update(protected)
if unprotected:
obj.update(unprotected)
if header:
obj.update(header)
super().__init__(obj)
self.protected = protected if protected else {}
self.unprotected = unprotected if unprotected else {}
self.header = header if header else {}
|