1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
"""Encrypt or Decrypt multiplexer transport data."""
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from ..exceptions import MultiplexerTransportDecrypt
class CryptoTransport:
"""Encrypt/Decrypt Transport flow."""
__slots__ = ["_cipher", "_decryptor", "_encryptor"]
def __init__(self, key: bytes, iv: bytes) -> None:
"""Initialize crypto data."""
self._cipher = Cipher(
algorithms.AES(key),
modes.CBC(iv),
backend=default_backend(),
)
self._encryptor = self._cipher.encryptor()
self._decryptor = self._cipher.decryptor()
def encrypt(self, data: bytes) -> bytes:
"""Encrypt data from transport."""
return self._encryptor.update(data)
def decrypt(self, data: bytes) -> bytes:
"""Decrypt data from transport."""
try:
return self._decryptor.update(data)
except InvalidTag:
raise MultiplexerTransportDecrypt from None
|