File: fle2_crypto.py

package info (click to toggle)
libmongocrypt 1.17.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,572 kB
  • sloc: ansic: 70,067; python: 4,547; cpp: 615; sh: 460; makefile: 44; awk: 8
file content (117 lines) | stat: -rw-r--r-- 3,300 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

import struct

ENCRYPTION_KEY_LENGTH = 32
MAC_KEY_LENGTH = 32
HMAC_SHA256_TAG_LENGTH = 32
IV_LENGTH = 16
DEK_LENGTH = 96
BLOCK_LENGTH = 16

def _hmacsha256 (Km, input):
    assert (len(Km) == MAC_KEY_LENGTH)
    hm = hmac.HMAC(Km, hashes.SHA256())
    hm.update (input)
    return hm.finalize()

class DEK ():
    """
    Class representing a Data Encryption Key (DEK)
    """
    def __init__ (self, bytesIn):
        assert (len(bytesIn) == DEK_LENGTH)
        self.Ke = bytesIn[0:32]
        self.Km = bytesIn[32:64]
        self.TokenKey = bytesIn[64:96]

def _fle2_encrypt (IV, Ke, M, mode, Km = None, AD = None):
    """
    Generalized encrypt vector create.
    S = AES-{mode}.Enc(Ke, IV, M)
    if Km is not None:
        T = HMAC/SHA-256(Km, AD || IV || S)
    C = IV || S || T
    """
    assert len(Ke) == ENCRYPTION_KEY_LENGTH
    assert len(IV) == IV_LENGTH
    assert (mode == 'CTR') or (mode == 'CBC')
    modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV)

    # S = AES-{mode}.Enc(Ke, IV, M)
    cipher = Cipher(algorithms.AES(Ke), modeObj)
    encryptor = cipher.encryptor()
    if mode == 'CBC':
        # PKCS#7
        padding_len = BLOCK_LENGTH - (len(M) % BLOCK_LENGTH)
        M = M + (padding_len.to_bytes(1, 'big') * padding_len)
    S = encryptor.update(M) + encryptor.finalize()

    if Km is not None:
        assert AD is not None
        assert len(Km) == MAC_KEY_LENGTH
        # T = HMAC-SHA256(Km, AD || IV || S)
        T = _hmacsha256 (Km, AD + IV + S)
    else:
        assert AD is None
        T = b''

    # C = IV + S + T
    C = IV + S + T
    return C


def fle2_encrypt (M, Ke, IV):
    """ AES-256-CTR/NONE """
    return _fle2_encrypt (IV, Ke, M, 'CTR')

def fle2aead_encrypt(M, Ke, IV, Km, AD):
    """ AES-256-CTR/SHA-256 """
    return _fle2_encrypt (IV, Ke, M, 'CTR', Km, AD)

def fle2v2_aead_encrypt(M, Ke, IV, Km, AD):
    """ AES-256-CBC/SHA-256 """
    return _fle2_encrypt (IV, Ke, M, 'CBC', Km, AD)

def _fle2_decrypt (C, Ke, mode, Km = None, AD = None):
    assert (len(Ke) == ENCRYPTION_KEY_LENGTH)

    Tlen = 0 if Km is None else HMAC_SHA256_TAG_LENGTH;
    assert (len(C) > (IV_LENGTH + Tlen))
    # C = IV || S || T
    IV = C[0:IV_LENGTH]
    S = C[IV_LENGTH:-Tlen]

    if Km is not None:
        T = C[-Tlen:]
        assert T == _hmacsha256 (Km, AD + IV + S)

    assert (mode == 'CTR') or (mode == 'CBC')
    modeObj = modes.CTR(IV) if mode == 'CTR' else modes.CBC(IV)

    # M = AES-{mode}.Dec(Ke, IV, S)
    cipher = Cipher(algorithms.AES(Ke), modeObj)
    encryptor = cipher.decryptor()
    if mode == 'CBC':
        # PKCS#7
        padding_len = ord(S[-1:])
        S = S[-padding_len:]
    M = encryptor.update(S) + encryptor.finalize()

    return M

def fle2_decrypt (C, Ke):
    """AES-256-CTR/NONE"""
    return _fle2_decrypt (C, Ke, 'CTR')

def fle2aead_decrypt(C, Km, AD, Ke):
    """AES-256-CTR/SHA-256"""
    return _fle2_decrypt (C, Ke, 'CTR', Km, AD)

def fle2v2_aead_decrypt(C, Km, AD, Ke):
    """AES-256-CBC/SHA-256"""
    return _fle2_decrypt (C, Ke, 'CBC', Km, AD)

def ServerDataEncryptionLevel1Token (rootKey):
    return _hmacsha256 (rootKey, struct.pack("<Q", 3))