File: key_store.py

package info (click to toggle)
gajim-openpgp 1.7.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 160 kB
  • sloc: python: 1,487; sh: 29; xml: 11; makefile: 2
file content (270 lines) | stat: -rw-r--r-- 8,919 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of the OpenPGP Gajim Plugin.
#
# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# OpenPGP Gajim Plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

import logging
from collections.abc import Iterator

from nbxmpp.protocol import JID
from nbxmpp.structs import PGPKeyMetadata

from openpgp.backend.base import BasePGPBackend
from openpgp.backend.sql import ContactRow
from openpgp.backend.sql import Storage
from openpgp.modules.util import Trust

log = logging.getLogger("gajim.p.openpgp.store")


class KeyData:
    """
    Holds all data related to a certain key
    """

    def __init__(
        self,
        contact_data: ContactData,
        fingerprint: str,
        active: bool,
        trust: Trust,
        timestamp: float,
    ):
        self._contact_data = contact_data
        self.fingerprint = fingerprint
        self.active = active
        self._trust = trust
        self.timestamp = timestamp
        self.comment = None
        self.has_pubkey = False

    @property
    def trust(self) -> Trust:
        return self._trust

    @trust.setter
    def trust(self, value: Trust) -> None:
        if value not in (Trust.NOT_TRUSTED, Trust.UNKNOWN, Trust.BLIND, Trust.VERIFIED):
            raise ValueError("Trust value not allowed: %s" % value)

        self._trust = value
        self._contact_data.set_trust(self.fingerprint, self._trust)

    def delete(self):
        self._contact_data.delete_key(self.fingerprint)


class ContactData:
    """
    Holds all data related to a contact
    """

    def __init__(self, jid: JID, storage: Storage, pgp: BasePGPBackend) -> None:
        self.jid = jid
        self._key_store: dict[str, KeyData] = {}
        self._storage = storage
        self._pgp = pgp

    @property
    def userid(self):
        if self.jid is None:
            raise ValueError("JID not set")
        return "xmpp:%s" % self.jid

    @property
    def default_trust(self) -> Trust:
        for key in self._key_store.values():
            if key.trust in (Trust.NOT_TRUSTED, Trust.BLIND):
                return Trust.UNKNOWN
        return Trust.BLIND

    def db_values(self) -> Iterator[tuple[JID, str, bool, Trust, float]]:
        for key in self._key_store.values():
            yield (
                self.jid,
                key.fingerprint,
                key.active,
                key.trust,
                key.timestamp,
            )

    def add_from_key(self, key: PGPKeyMetadata) -> KeyData:
        try:
            keydata = self._key_store[key.fingerprint]
        except KeyError:
            keydata = KeyData(
                self,
                key.fingerprint,
                True,
                self.default_trust,
                key.date,
            )
            self._key_store[key.fingerprint] = keydata
            log.info("Add from key: %s %s", self.jid, keydata.fingerprint)
        return keydata

    def add_from_db(self, row: ContactRow) -> KeyData:
        try:
            keydata = self._key_store[row.fingerprint]
        except KeyError:
            keydata = KeyData(
                self,
                row.fingerprint,
                row.active,
                row.trust,
                row.timestamp,
            )
            self._key_store[row.fingerprint] = keydata
            log.info("Add from row: %s %s", self.jid, row.fingerprint)
        return keydata

    def process_keylist(self, keylist: list[PGPKeyMetadata] | None) -> list[str]:
        log.info("Process keylist: %s %s", self.jid, keylist)

        if keylist is None:
            for keydata in self._key_store.values():
                keydata.active = False
            self._storage.save_contact(self.db_values())
            return []

        missing_pub_keys: list[str] = []
        fingerprints = {key.fingerprint for key in keylist}
        if fingerprints == self._key_store.keys():
            log.info("No updates found")
            for key in self._key_store.values():
                if not key.has_pubkey:
                    missing_pub_keys.append(key.fingerprint)
            return missing_pub_keys

        for keydata in self._key_store.values():
            keydata.active = False

        for key in keylist:
            try:
                keydata = self._key_store[key.fingerprint]
                keydata.active = True
                if not keydata.has_pubkey:
                    missing_pub_keys.append(keydata.fingerprint)
            except KeyError:
                keydata = self.add_from_key(key)
                missing_pub_keys.append(keydata.fingerprint)

        self._storage.save_contact(self.db_values())
        return missing_pub_keys

    def set_public_key(self, fingerprint: str) -> None:
        try:
            keydata = self._key_store[fingerprint]
        except KeyError:
            log.warning(
                "Set public key on unknown fingerprint: %s %s", self.jid, fingerprint
            )
        else:
            keydata.has_pubkey = True
        log.info("Set public key: %s %s", self.jid, fingerprint)

    def get_keys(self, only_trusted: bool = True) -> list[KeyData]:
        keys = list(self._key_store.values())
        if not only_trusted:
            return keys
        return [
            k for k in keys if k.active and k.trust in (Trust.VERIFIED, Trust.BLIND)
        ]

    def get_key(self, fingerprint: str) -> KeyData | None:
        return self._key_store.get(fingerprint, None)

    def set_trust(self, fingerprint: str, trust: Trust) -> None:
        self._storage.set_trust(self.jid, fingerprint, trust)

    def delete_key(self, fingerprint: str) -> None:
        self._storage.delete_key(self.jid, fingerprint)
        self._pgp.delete_key(fingerprint)
        del self._key_store[fingerprint]


class PGPContacts:
    """
    Holds all contacts available for PGP encryption
    """

    def __init__(self, pgp: BasePGPBackend, storage: Storage) -> None:
        self._contacts: dict[JID, ContactData] = {}
        self._storage = storage
        self._pgp = pgp
        self._load_from_storage()
        self._load_from_keyring()

    def _load_from_keyring(self):
        log.info("Load keys from keyring")
        keyring = self._pgp.get_keys()
        for key in keyring:
            log.info("Found: %s %s", key.jid, key.fingerprint)
            assert key.jid is not None
            self.set_public_key(key.jid, key.fingerprint)

    def _load_from_storage(self):
        log.info("Load contacts from storage")
        rows = self._storage.load_contacts()
        for row in rows:
            log.info("Found: %s %s", row.jid, row.fingerprint)
            try:
                contact_data = self._contacts[row.jid]
            except KeyError:
                contact_data = ContactData(row.jid, self._storage, self._pgp)
                contact_data.add_from_db(row)
                self._contacts[row.jid] = contact_data
            else:
                contact_data.add_from_db(row)

    def process_keylist(
        self, jid: JID, keylist: list[PGPKeyMetadata] | None
    ) -> list[str]:
        try:
            contact_data = self._contacts[jid]
        except KeyError:
            contact_data = ContactData(jid, self._storage, self._pgp)
            missing_pub_keys = contact_data.process_keylist(keylist)
            self._contacts[jid] = contact_data
        else:
            missing_pub_keys = contact_data.process_keylist(keylist)

        return missing_pub_keys

    def set_public_key(self, jid: JID, fingerprint: str) -> None:
        try:
            contact_data = self._contacts[jid]
        except KeyError:
            log.warning("ContactData not found: %s %s", jid, fingerprint)
        else:
            contact_data.set_public_key(fingerprint)

    def get_keys(self, jid: JID, only_trusted: bool = True) -> list[KeyData]:
        try:
            contact_data = self._contacts[jid]
            return contact_data.get_keys(only_trusted=only_trusted)
        except KeyError:
            return []

    def get_trust(self, jid: JID, fingerprint: str) -> Trust:
        contact_data = self._contacts.get(jid, None)
        if contact_data is None:
            return Trust.UNKNOWN

        key = contact_data.get_key(fingerprint)
        if key is None:
            return Trust.UNKNOWN
        return key.trust