1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
from pathlib import Path
import aiofiles
import pytest
import unpaddedbase64
from Cryptodome import Random # nosec
from nio import EncryptionError
from nio.crypto import async_encrypt_attachment, decrypt_attachment
FILEPATH = "tests/data/test_bytes"
@pytest.mark.asyncio
class TestClass:
async def _get_data_cypher_keys(self, data=b"Test bytes"):
*chunks, keys = [i async for i in async_encrypt_attachment(data)]
return (data, b"".join(chunks), keys)
async def test_encrypt(self, data=b"Test bytes", large=False):
_, ciphertext, keys = await self._get_data_cypher_keys(data)
plaintext = decrypt_attachment(
ciphertext,
keys["key"]["k"],
keys["hashes"]["sha256"],
keys["iv"],
)
assert plaintext == b"Test bytes" * (16384 if large else 1)
async def test_encrypt_large_bytes(self):
# Makes sure our bytes chunking in async_generator_from_data
# is working correctly
await self.test_encrypt(b"Test bytes" * 16384, large=True)
async def test_encrypt_str(self):
await self.test_encrypt(FILEPATH)
async def test_encrypt_path_object(self):
await self.test_encrypt(Path(FILEPATH))
async def test_encrypt_iterable(self):
await self.test_encrypt([b"Test ", b"bytes"])
async def test_encrypt_async_iterable(self):
async def async_gen():
yield b"Test "
yield b"bytes"
await self.test_encrypt(async_gen())
async def test_encrypt_file_object(self):
await self.test_encrypt(open(FILEPATH, "rb")) # noqa: ASYNC230
async def test_encrypt_async_file_object(self):
await self.test_encrypt(await aiofiles.open(FILEPATH, "rb"))
async def test_encrypt_bad_argument_type(self):
with pytest.raises(TypeError):
await self.test_encrypt(123)
async def test_hash_verification(self):
_data, ciphertext, keys = await self._get_data_cypher_keys()
with pytest.raises(EncryptionError):
decrypt_attachment(
ciphertext,
keys["key"]["k"],
"Fake hash",
keys["iv"],
)
async def test_invalid_key(self):
_data, ciphertext, keys = await self._get_data_cypher_keys()
with pytest.raises(EncryptionError):
decrypt_attachment(
ciphertext,
"Fake key",
keys["hashes"]["sha256"],
keys["iv"],
)
async def test_invalid_iv(self):
_data, ciphertext, keys = await self._get_data_cypher_keys()
with pytest.raises(EncryptionError):
decrypt_attachment(
ciphertext,
keys["key"]["k"],
keys["hashes"]["sha256"],
"Fake iv",
)
async def test_short_key(self):
_data, ciphertext, keys = await self._get_data_cypher_keys()
with pytest.raises(EncryptionError):
decrypt_attachment(
ciphertext,
unpaddedbase64.encode_base64(b"Fake key", urlsafe=True),
keys["hashes"]["sha256"],
keys["iv"],
)
async def test_short_iv(self):
data, ciphertext, keys = await self._get_data_cypher_keys()
plaintext = decrypt_attachment(
ciphertext,
keys["key"]["k"],
keys["hashes"]["sha256"],
unpaddedbase64.encode_base64(b"F" + b"\x00" * 8),
)
assert plaintext != data
async def test_fake_key(self):
data, ciphertext, keys = await self._get_data_cypher_keys()
fake_key = Random.new().read(32)
plaintext = decrypt_attachment(
ciphertext,
unpaddedbase64.encode_base64(fake_key, urlsafe=True),
keys["hashes"]["sha256"],
keys["iv"],
)
assert plaintext != data
|