File: models.py

package info (click to toggle)
python-authlib 1.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,016 kB
  • sloc: python: 26,998; makefile: 53; sh: 14
file content (157 lines) | stat: -rw-r--r-- 4,381 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
from abc import ABCMeta


class JWEAlgorithmBase(metaclass=ABCMeta):  # noqa: B024
    """Base interface for all JWE algorithms."""

    EXTRA_HEADERS = None

    name = None
    description = None
    algorithm_type = "JWE"
    algorithm_location = "alg"

    def prepare_key(self, raw_data):
        raise NotImplementedError

    def generate_preset(self, enc_alg, key):
        raise NotImplementedError


class JWEAlgorithm(JWEAlgorithmBase, metaclass=ABCMeta):
    """Interface for JWE algorithm conforming to RFC7518.
    JWA specification (RFC7518) SHOULD implement the algorithms for JWE
    with this base implementation.
    """

    def wrap(self, enc_alg, headers, key, preset=None):
        raise NotImplementedError

    def unwrap(self, enc_alg, ek, headers, key):
        raise NotImplementedError


class JWEAlgorithmWithTagAwareKeyAgreement(JWEAlgorithmBase, metaclass=ABCMeta):
    """Interface for JWE algorithm with tag-aware key agreement (in key agreement
    with key wrapping mode).
    ECDH-1PU is an example of such an algorithm.
    """

    def generate_keys_and_prepare_headers(self, enc_alg, key, sender_key, preset=None):
        raise NotImplementedError

    def agree_upon_key_and_wrap_cek(
        self, enc_alg, headers, key, sender_key, epk, cek, tag
    ):
        raise NotImplementedError

    def wrap(self, enc_alg, headers, key, sender_key, preset=None):
        raise NotImplementedError

    def unwrap(self, enc_alg, ek, headers, key, sender_key, tag=None):
        raise NotImplementedError


class JWEEncAlgorithm:
    name = None
    description = None
    algorithm_type = "JWE"
    algorithm_location = "enc"

    IV_SIZE = None
    CEK_SIZE = None

    def generate_cek(self):
        return os.urandom(self.CEK_SIZE // 8)

    def generate_iv(self):
        return os.urandom(self.IV_SIZE // 8)

    def check_iv(self, iv):
        if len(iv) * 8 != self.IV_SIZE:
            raise ValueError('Invalid "iv" size')

    def encrypt(self, msg, aad, iv, key):
        """Encrypt the given "msg" text.

        :param msg: text to be encrypt in bytes
        :param aad: additional authenticated data in bytes
        :param iv: initialization vector in bytes
        :param key: encrypted key in bytes
        :return: (ciphertext, tag)
        """
        raise NotImplementedError

    def decrypt(self, ciphertext, aad, iv, tag, key):
        """Decrypt the given cipher text.

        :param ciphertext: ciphertext in bytes
        :param aad: additional authenticated data in bytes
        :param iv: initialization vector in bytes
        :param tag: authentication tag in bytes
        :param key: encrypted key in bytes
        :return: message
        """
        raise NotImplementedError


class JWEZipAlgorithm:
    name = None
    description = None
    algorithm_type = "JWE"
    algorithm_location = "zip"

    def compress(self, s):
        raise NotImplementedError

    def decompress(self, s):
        raise NotImplementedError


class JWESharedHeader(dict):
    """Shared header object for JWE.

    Combines protected header and shared unprotected header together.
    """

    def __init__(self, protected, unprotected):
        obj = {}
        if protected:
            obj.update(protected)
        if unprotected:
            obj.update(unprotected)
        super().__init__(obj)
        self.protected = protected if protected else {}
        self.unprotected = unprotected if unprotected else {}

    def update_protected(self, addition):
        self.update(addition)
        self.protected.update(addition)

    @classmethod
    def from_dict(cls, obj):
        if isinstance(obj, cls):
            return obj
        return cls(obj.get("protected"), obj.get("unprotected"))


class JWEHeader(dict):
    """Header object for JWE.

    Combines protected header, shared unprotected header
    and specific recipient's unprotected header together.
    """

    def __init__(self, protected, unprotected, header):
        obj = {}
        if protected:
            obj.update(protected)
        if unprotected:
            obj.update(unprotected)
        if header:
            obj.update(header)
        super().__init__(obj)
        self.protected = protected if protected else {}
        self.unprotected = unprotected if unprotected else {}
        self.header = header if header else {}