1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
import hashlib
import json
import random
from base64 import decodebytes, encodebytes
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
from pubnub.crypto_core import PubNubCrypto
Initial16bytes = "0123456789012345"
class PubNubCryptodome(PubNubCrypto):
mode = AES.MODE_CBC
fallback_mode = None
def __init__(self, pubnub_config):
self.pubnub_configuration = pubnub_config
self.mode = pubnub_config.cipher_mode
self.fallback_mode = pubnub_config.fallback_cipher_mode
def encrypt(self, key, msg, use_random_iv=False):
secret = self.get_secret(key)
initialization_vector = self.get_initialization_vector(use_random_iv)
cipher = AES.new(
bytes(secret[0:32], "utf-8"),
self.mode,
bytes(initialization_vector, "utf-8"),
)
encrypted_message = cipher.encrypt(self.pad(msg.encode("utf-8")))
msg_with_iv = self.append_random_iv(
encrypted_message, use_random_iv, bytes(initialization_vector, "utf-8")
)
return encodebytes(msg_with_iv).decode("utf-8").replace("\n", "")
def decrypt(self, key, msg, use_random_iv=False):
secret = self.get_secret(key)
decoded_message = decodebytes(msg.encode("utf-8"))
initialization_vector, extracted_message = self.extract_random_iv(
decoded_message, use_random_iv
)
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)
try:
plain = self.depad((cipher.decrypt(extracted_message)).decode("utf-8"))
except UnicodeDecodeError as e:
if not self.fallback_mode:
raise e
cipher = AES.new(
bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector
)
plain = self.depad((cipher.decrypt(extracted_message)).decode("utf-8"))
try:
return json.loads(plain)
except Exception:
return plain
def append_random_iv(self, message, use_random_iv, initialization_vector):
if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
return initialization_vector + message
else:
return message
def extract_random_iv(self, message, use_random_iv):
if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
return message[0:16], message[16:]
else:
return bytes(Initial16bytes, "utf-8"), message
def get_initialization_vector(self, use_random_iv):
if self.pubnub_configuration.use_random_initialization_vector or use_random_iv:
return f"{random.randint(0, 9999999999999999):016}"
else:
return Initial16bytes
def pad(self, msg, block_size=16):
padding = block_size - (len(msg) % block_size)
return msg + (chr(padding) * padding).encode("utf-8")
def depad(self, msg):
return msg[0 : -ord(msg[-1])]
def get_secret(self, key):
return hashlib.sha256(key.encode("utf-8")).hexdigest()
class PubNubFileCrypto(PubNubCryptodome):
def encrypt(self, key, file):
secret = self.get_secret(key)
initialization_vector = self.get_initialization_vector(use_random_iv=True)
cipher = AES.new(
bytes(secret[0:32], "utf-8"),
self.mode,
bytes(initialization_vector, "utf-8"),
)
initialization_vector = bytes(initialization_vector, "utf-8")
return self.append_random_iv(
cipher.encrypt(pad(file, 16)),
use_random_iv=True,
initialization_vector=initialization_vector,
)
def decrypt(self, key, file):
secret = self.get_secret(key)
initialization_vector, extracted_file = self.extract_random_iv(
file, use_random_iv=True
)
try:
cipher = AES.new(
bytes(secret[0:32], "utf-8"), self.mode, initialization_vector
)
result = unpad(cipher.decrypt(extracted_file), 16)
except ValueError:
cipher = AES.new(
bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector
)
result = unpad(cipher.decrypt(extracted_file), 16)
return result
|