1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of the OpenPGP Gajim Plugin.
#
# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# OpenPGP Gajim Plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
from typing import Any
import logging
from collections.abc import Sequence
from pathlib import Path
import gnupg
from nbxmpp.protocol import JID
from openpgp.backend.base import BaseKeyringItem
from openpgp.backend.base import BasePGPBackend
from openpgp.backend.util import parse_uid
from openpgp.modules.key_store import KeyData
from openpgp.modules.util import DecryptionFailed
log = logging.getLogger("gajim.p.openpgp.pygnupg")
if log.getEffectiveLevel() == logging.DEBUG:
log = logging.getLogger("gnupg")
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
class KeyringItem(BaseKeyringItem):
def __init__(self, key: dict[Any, Any]) -> None:
self._key = key
BaseKeyringItem.__init__(self)
@property
def keyid(self) -> str:
return self._key["keyid"]
def _get_uid(self) -> str | None:
for uid in self._key["uids"]:
try:
return parse_uid(uid)
except Exception:
pass
@property
def fingerprint(self) -> str:
return self._key["fingerprint"]
class PythonGnuPG(BasePGPBackend):
def __init__(self, jid: str, gnupghome: Path) -> None:
self._gnupg = gnupg.GPG(gpgbinary="gpg", gnupghome=str(gnupghome))
self._jid = jid
self._own_fingerprint = None
@staticmethod
def _get_key_params(jid: str) -> str:
"""
Generate --gen-key input
"""
params = {
"Key-Type": "RSA",
"Key-Length": 2048,
"Name-Real": "xmpp:%s" % jid,
}
out = "Key-Type: %s\n" % params.pop("Key-Type")
for key, val in list(params.items()):
out += "%s: %s\n" % (key, val)
out += "%no-protection\n"
out += "%commit\n"
return out
def generate_key(self) -> None:
self._gnupg.gen_key(self._get_key_params(self._jid))
def encrypt(
self, payload: bytes, keys: list[KeyData]
) -> tuple[bytes | None, str | None]:
recipients = [key.fingerprint for key in keys]
log.info("encrypt to:")
for fingerprint in recipients:
log.info(fingerprint)
result = self._gnupg.encrypt(
payload,
recipients,
armor=False,
sign=self._own_fingerprint,
always_trust=True,
)
if result.ok:
error = ""
else:
error = result.status
return result.data, error
def decrypt(self, payload: bytes) -> tuple[str, str]:
result = self._gnupg.decrypt(payload, always_trust=True)
if not result.ok:
raise DecryptionFailed(result.status)
assert result.fingerprint is not None
return result.data.decode("utf8"), result.fingerprint
def _get_key(self, fingerprint: str) -> gnupg.ListKeys:
return self._gnupg.list_keys(keys=[fingerprint])
def get_keys(self) -> Sequence[KeyringItem]:
result = self._gnupg.list_keys(secret=False)
keys: list[KeyringItem] = []
for key in result:
item = KeyringItem(key)
if not item.is_xmpp_key:
log.warning("Invalid key found, deleting key")
log.warning(key)
self.delete_key(item.fingerprint)
continue
keys.append(item)
return keys
def import_key(self, data: bytes, jid: JID) -> KeyringItem | None:
log.info("Import key from %s", jid)
result = self._gnupg.import_keys(data)
if not result:
log.error("Could not import key")
log.error(result)
return
fpr = result.results[0]["fingerprint"]
assert fpr is not None
key = self._get_key(fpr)
item = KeyringItem(key[0])
if not item.is_valid(jid):
log.warning("Invalid key found, deleting key")
log.warning(key)
self.delete_key(item.fingerprint)
return
return item
def get_own_key_details(self) -> tuple[str | None, int | None]:
result = self._gnupg.list_keys(secret=True)
if not result:
return None, None
if len(result) > 1:
log.error("More than one secret key found")
return None, None
self._own_fingerprint = result[0]["fingerprint"]
return self._own_fingerprint, int(result[0]["date"])
def export_key(self, fingerprint: str) -> bytes | None:
key = self._gnupg.export_keys(
fingerprint, secret=False, armor=False, minimal=True
)
assert isinstance(key, bytes | None)
return key
def delete_key(self, fingerprint: str) -> None:
log.info("Delete Key: %s", fingerprint)
self._gnupg.delete_keys(fingerprint)
|