File: async_attachment_test.py

package info (click to toggle)
python-matrix-nio 0.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,224 kB
  • sloc: python: 23,670; makefile: 36; sh: 8
file content (129 lines) | stat: -rw-r--r-- 3,967 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from pathlib import Path

import aiofiles
import pytest
import unpaddedbase64
from Cryptodome import Random  # nosec

from nio import EncryptionError
from nio.crypto import async_encrypt_attachment, decrypt_attachment

FILEPATH = "tests/data/test_bytes"


@pytest.mark.asyncio
class TestClass:
    async def _get_data_cypher_keys(self, data=b"Test bytes"):
        *chunks, keys = [i async for i in async_encrypt_attachment(data)]
        return (data, b"".join(chunks), keys)

    async def test_encrypt(self, data=b"Test bytes", large=False):
        _, ciphertext, keys = await self._get_data_cypher_keys(data)

        plaintext = decrypt_attachment(
            ciphertext,
            keys["key"]["k"],
            keys["hashes"]["sha256"],
            keys["iv"],
        )

        assert plaintext == b"Test bytes" * (16384 if large else 1)

    async def test_encrypt_large_bytes(self):
        # Makes sure our bytes chunking in async_generator_from_data
        # is working correctly
        await self.test_encrypt(b"Test bytes" * 16384, large=True)

    async def test_encrypt_str(self):
        await self.test_encrypt(FILEPATH)

    async def test_encrypt_path_object(self):
        await self.test_encrypt(Path(FILEPATH))

    async def test_encrypt_iterable(self):
        await self.test_encrypt([b"Test ", b"bytes"])

    async def test_encrypt_async_iterable(self):
        async def async_gen():
            yield b"Test "
            yield b"bytes"

        await self.test_encrypt(async_gen())

    async def test_encrypt_file_object(self):
        await self.test_encrypt(open(FILEPATH, "rb"))  # noqa: ASYNC230

    async def test_encrypt_async_file_object(self):
        await self.test_encrypt(await aiofiles.open(FILEPATH, "rb"))

    async def test_encrypt_bad_argument_type(self):
        with pytest.raises(TypeError):
            await self.test_encrypt(123)

    async def test_hash_verification(self):
        _data, ciphertext, keys = await self._get_data_cypher_keys()

        with pytest.raises(EncryptionError):
            decrypt_attachment(
                ciphertext,
                keys["key"]["k"],
                "Fake hash",
                keys["iv"],
            )

    async def test_invalid_key(self):
        _data, ciphertext, keys = await self._get_data_cypher_keys()

        with pytest.raises(EncryptionError):
            decrypt_attachment(
                ciphertext,
                "Fake key",
                keys["hashes"]["sha256"],
                keys["iv"],
            )

    async def test_invalid_iv(self):
        _data, ciphertext, keys = await self._get_data_cypher_keys()

        with pytest.raises(EncryptionError):
            decrypt_attachment(
                ciphertext,
                keys["key"]["k"],
                keys["hashes"]["sha256"],
                "Fake iv",
            )

    async def test_short_key(self):
        _data, ciphertext, keys = await self._get_data_cypher_keys()

        with pytest.raises(EncryptionError):
            decrypt_attachment(
                ciphertext,
                unpaddedbase64.encode_base64(b"Fake key", urlsafe=True),
                keys["hashes"]["sha256"],
                keys["iv"],
            )

    async def test_short_iv(self):
        data, ciphertext, keys = await self._get_data_cypher_keys()

        plaintext = decrypt_attachment(
            ciphertext,
            keys["key"]["k"],
            keys["hashes"]["sha256"],
            unpaddedbase64.encode_base64(b"F" + b"\x00" * 8),
        )
        assert plaintext != data

    async def test_fake_key(self):
        data, ciphertext, keys = await self._get_data_cypher_keys()

        fake_key = Random.new().read(32)

        plaintext = decrypt_attachment(
            ciphertext,
            unpaddedbase64.encode_base64(fake_key, urlsafe=True),
            keys["hashes"]["sha256"],
            keys["iv"],
        )
        assert plaintext != data