1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
import json
from os import path
from pathlib import Path
import pytest
from hypothesis import given
from hypothesis.strategies import binary
from nio import EncryptionError
from nio.crypto import Olm
from nio.crypto.key_export import decrypt, decrypt_and_read, encrypt, encrypt_and_save
from nio.store import DefaultStore
TEST_ROOM = "!test:example.org"
class TestClass:
@given(binary())
def test_encrypt(self, data):
passphrase = "A secret"
ciphertext = encrypt(data, passphrase, count=10)
plaintext = decrypt(ciphertext, passphrase)
assert data == plaintext
def test_encrypt_rounds(self, benchmark):
data = b"data"
passphrase = "A secret"
benchmark(encrypt, data, passphrase, count=10000)
def test_decrypt_failure(self):
data = b"data"
passphrase = "A secret"
ciphertext = encrypt(data, passphrase, count=10)
with pytest.raises(
ValueError, match="HMAC check failed for encrypted payload."
):
decrypt(ciphertext, "Fake key")
def test_encrypt_file(self, tempdir):
data = b"data"
passphrase = "A secret"
file = path.join(tempdir, "keys_file")
encrypt_and_save(data, file, passphrase, count=10)
plaintext = decrypt_and_read(file, passphrase)
assert plaintext == data
def test_export(self, tempdir):
user_id = "ephemeral"
device_id = "DEVICEID"
file = path.join(tempdir, "keys_file")
store = DefaultStore(user_id, device_id, tempdir, "")
olm = Olm(user_id, device_id, store)
olm.create_outbound_group_session(TEST_ROOM)
out_session = olm.outbound_group_sessions[TEST_ROOM]
assert olm.inbound_group_store.get(
TEST_ROOM, olm.account.identity_keys["curve25519"], out_session.id
)
olm.export_keys(file, "pass")
alice_store = DefaultStore("alice", device_id, tempdir, "")
alice = Olm("alice", device_id, alice_store)
assert not alice.inbound_group_store.get(
TEST_ROOM, olm.account.identity_keys["curve25519"], out_session.id
)
alice.import_keys(file, "pass")
assert alice.inbound_group_store.get(
TEST_ROOM, olm.account.identity_keys["curve25519"], out_session.id
)
def test_unencrypted_import(self, tempdir):
device_id = "DEVICEID"
file = Path(tempdir, "keys_file")
file.write_text("{}")
alice_store = DefaultStore("alice", device_id, tempdir, "")
alice = Olm("alice", device_id, alice_store)
with pytest.raises(EncryptionError):
alice.import_keys(str(file), "pass")
def test_invalid_json(self, tempdir):
device_id = "DEVICEID"
file = path.join(tempdir, "keys_file")
encrypt_and_save(b"{sessions: [{}]}", file, "pass", count=10)
alice_store = DefaultStore("alice", device_id, tempdir, "")
alice = Olm("alice", device_id, alice_store)
with pytest.raises(EncryptionError):
alice.import_keys(file, "pass")
def test_invalid_json_schema(self, tempdir):
file = path.join(tempdir, "keys_file")
payload = {"sessions": [{"algorithm": "test"}]}
encrypt_and_save(json.dumps(payload).encode(), file, "pass", count=10)
imported = Olm.import_keys_static(file, "pass")
assert len(imported) == 0
|