File: crypto.py

package info (click to toggle)
freenub 0.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,168 kB
  • sloc: python: 10,664; makefile: 7; sh: 6
file content (125 lines) | stat: -rw-r--r-- 4,377 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import hashlib
import json
import random
from base64 import decodebytes, encodebytes

from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad

from pubnub.crypto_core import PubNubCrypto

Initial16bytes = "0123456789012345"


class PubNubCryptodome(PubNubCrypto):
    mode = AES.MODE_CBC
    fallback_mode = None

    def __init__(self, pubnub_config):
        self.pubnub_configuration = pubnub_config
        self.mode = pubnub_config.cipher_mode
        self.fallback_mode = pubnub_config.fallback_cipher_mode

    def encrypt(self, key, msg, use_random_iv=False):
        secret = self.get_secret(key)
        initialization_vector = self.get_initialization_vector(use_random_iv)

        cipher = AES.new(
            bytes(secret[0:32], "utf-8"),
            self.mode,
            bytes(initialization_vector, "utf-8"),
        )
        encrypted_message = cipher.encrypt(self.pad(msg.encode("utf-8")))
        msg_with_iv = self.append_random_iv(
            encrypted_message, use_random_iv, bytes(initialization_vector, "utf-8")
        )

        return encodebytes(msg_with_iv).decode("utf-8").replace("\n", "")

    def decrypt(self, key, msg, use_random_iv=False):
        secret = self.get_secret(key)

        decoded_message = decodebytes(msg.encode("utf-8"))
        initialization_vector, extracted_message = self.extract_random_iv(
            decoded_message, use_random_iv
        )
        cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)
        try:
            plain = self.depad((cipher.decrypt(extracted_message)).decode("utf-8"))
        except UnicodeDecodeError as e:
            if not self.fallback_mode:
                raise e

            cipher = AES.new(
                bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector
            )
            plain = self.depad((cipher.decrypt(extracted_message)).decode("utf-8"))

        try:
            return json.loads(plain)
        except Exception:
            return plain

    def append_random_iv(self, message, use_random_iv, initialization_vector):
        if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
            return initialization_vector + message
        else:
            return message

    def extract_random_iv(self, message, use_random_iv):
        if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
            return message[0:16], message[16:]
        else:
            return bytes(Initial16bytes, "utf-8"), message

    def get_initialization_vector(self, use_random_iv):
        if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
            return f"{random.randint(0, 9999999999999999):016}"
        else:
            return Initial16bytes

    def pad(self, msg, block_size=16):
        padding = block_size - (len(msg) % block_size)
        return msg + (chr(padding) * padding).encode("utf-8")

    def depad(self, msg):
        return msg[0 : -ord(msg[-1])]

    def get_secret(self, key):
        return hashlib.sha256(key.encode("utf-8")).hexdigest()


class PubNubFileCrypto(PubNubCryptodome):
    def encrypt(self, key, file):
        secret = self.get_secret(key)
        initialization_vector = self.get_initialization_vector(use_random_iv=True)
        cipher = AES.new(
            bytes(secret[0:32], "utf-8"),
            self.mode,
            bytes(initialization_vector, "utf-8"),
        )
        initialization_vector = bytes(initialization_vector, "utf-8")

        return self.append_random_iv(
            cipher.encrypt(pad(file, 16)),
            use_random_iv=True,
            initialization_vector=initialization_vector,
        )

    def decrypt(self, key, file):
        secret = self.get_secret(key)
        initialization_vector, extracted_file = self.extract_random_iv(
            file, use_random_iv=True
        )
        try:
            cipher = AES.new(
                bytes(secret[0:32], "utf-8"), self.mode, initialization_vector
            )
            result = unpad(cipher.decrypt(extracted_file), 16)
        except ValueError:
            cipher = AES.new(
                bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector
            )
            result = unpad(cipher.decrypt(extracted_file), 16)

        return result