File: test_encryption.py

package info (click to toggle)
gajim 2.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 17,320 kB
  • sloc: python: 79,114; sh: 387; xml: 67; makefile: 6
file content (110 lines) | stat: -rw-r--r-- 3,724 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# This file is part of Gajim.
#
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

import unittest
from datetime import datetime
from datetime import UTC

from nbxmpp.protocol import JID
from sqlalchemy import select

from gajim.common import app
from gajim.common.helpers import get_uuid
from gajim.common.settings import Settings
from gajim.common.storage.archive.const import ChatDirection
from gajim.common.storage.archive.const import MessageState
from gajim.common.storage.archive.const import MessageType
from gajim.common.storage.archive.models import Encryption
from gajim.common.storage.archive.models import Message
from gajim.common.storage.archive.storage import MessageArchiveStorage


class EncryptionTest(unittest.TestCase):
    def setUp(self) -> None:
        self._archive = MessageArchiveStorage(in_memory=True)
        self._archive.init()

        self._account_jid = JID.from_string("user@domain.org")
        self._account = "testacc1"
        self._remote_jid = JID.from_string("remote@jid.org")
        self._init_settings()

    def tearDown(self) -> None:
        self._archive.shutdown()

    def _init_settings(self) -> None:
        app.settings = Settings(in_memory=True)
        app.settings.init()
        app.settings.add_account("testacc1")
        app.settings.set_account_setting("testacc1", "address", "user@domain.org")

    def test_encryption_join(self):
        enc_data1 = Encryption(protocol=1, key="testkey", trust=2)

        enc_data2 = Encryption(protocol=1, key="testkey", trust=2)

        message_data1 = Message(
            account_=self._account,
            remote_jid_=self._remote_jid,
            type=MessageType.CHAT,
            direction=ChatDirection.INCOMING,
            timestamp=datetime.fromtimestamp(0, UTC),
            state=MessageState.ACKNOWLEDGED,
            resource="res",
            text="Some Message",
            id="1",
            stanza_id=get_uuid(),
            encryption_=enc_data1,
        )

        message_data2 = Message(
            account_=self._account,
            remote_jid_=self._remote_jid,
            type=MessageType.CHAT,
            direction=ChatDirection.INCOMING,
            timestamp=datetime.fromtimestamp(1, UTC),
            state=MessageState.ACKNOWLEDGED,
            resource="res",
            text="Some other Message",
            id="2",
            stanza_id=get_uuid(),
            encryption_=enc_data2,
        )

        message_pk1 = self._archive.insert_object(message_data1)
        message_pk2 = self._archive.insert_object(message_data2)

        message1 = self._archive.get_message_with_pk(message_pk1)
        message2 = self._archive.get_message_with_pk(message_pk2)

        assert message1 is not None
        assert message1.encryption is not None

        assert message2 is not None
        assert message2.encryption is not None

        self.assertEqual(message1.encryption.pk, message2.encryption.pk)
        self.assertEqual(message1.encryption.protocol, 1)
        self.assertEqual(message1.encryption.key, "testkey")
        self.assertEqual(message1.encryption.trust, 2)

    def test_encryption_update(self):
        enc_data1 = Encryption(protocol=1, key="testkey1", trust=2)

        enc_data2 = Encryption(protocol=1, key="testkey1", trust=2)

        pk1 = self._archive.insert_row(enc_data1, return_pk_on_conflict=True)
        pk2 = self._archive.insert_row(enc_data2, return_pk_on_conflict=True)

        self.assertEqual(pk1, pk2)

        with self._archive.get_session() as s:
            res = s.scalar(select(Encryption).where(Encryption.pk == pk1))
        assert res is not None


if __name__ == "__main__":
    unittest.main()