File: account.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (108 lines) | stat: -rw-r--r-- 3,932 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, Dict, Optional, cast
from datetime import datetime

import olm

from mautrix.types import (
    DeviceID,
    EncryptionAlgorithm,
    EncryptionKeyAlgorithm,
    IdentityKey,
    SigningKey,
    UserID,
)

from . import base
from .sessions import Session


class OlmAccount(olm.Account):
    shared: bool
    _signing_key: Optional[SigningKey]
    _identity_key: Optional[IdentityKey]

    def __init__(self) -> None:
        super().__init__()
        self.shared = False
        self._signing_key = None
        self._identity_key = None

    @property
    def signing_key(self) -> SigningKey:
        if self._signing_key is None:
            self._signing_key = SigningKey(self.identity_keys["ed25519"])
        return self._signing_key

    @property
    def identity_key(self) -> IdentityKey:
        if self._identity_key is None:
            self._identity_key = IdentityKey(self.identity_keys["curve25519"])
        return self._identity_key

    @property
    def fingerprint(self) -> str:
        """
        Fingerprint is the base64-encoded signing key of this account, with spaces every 4
        characters. This is what is used for manual device verification.
        """
        key = self.signing_key
        return " ".join([key[i : i + 4] for i in range(0, len(key), 4)])

    @classmethod
    def from_pickle(cls, pickle: bytes, passphrase: str, shared: bool) -> "OlmAccount":
        account = cast(OlmAccount, super().from_pickle(pickle, passphrase))
        account.shared = shared
        account._signing_key = None
        account._identity_key = None
        return account

    def new_inbound_session(self, sender_key: IdentityKey, ciphertext: str) -> Session:
        session = olm.InboundSession(self, olm.OlmPreKeyMessage(ciphertext), sender_key)
        self.remove_one_time_keys(session)
        return Session.from_pickle(
            session.pickle("roundtrip"), passphrase="roundtrip", creation_time=datetime.now()
        )

    def new_outbound_session(self, target_key: IdentityKey, one_time_key: IdentityKey) -> Session:
        session = olm.OutboundSession(self, target_key, one_time_key)
        return Session.from_pickle(
            session.pickle("roundtrip"), passphrase="roundtrip", creation_time=datetime.now()
        )

    def get_device_keys(self, user_id: UserID, device_id: DeviceID) -> Dict[str, Any]:
        device_keys = {
            "user_id": user_id,
            "device_id": device_id,
            "algorithms": [EncryptionAlgorithm.OLM_V1.value, EncryptionAlgorithm.MEGOLM_V1.value],
            "keys": {
                f"{algorithm}:{device_id}": key for algorithm, key in self.identity_keys.items()
            },
        }
        signature = self.sign(base.canonical_json(device_keys))
        device_keys["signatures"] = {
            user_id: {f"{EncryptionKeyAlgorithm.ED25519}:{device_id}": signature}
        }
        return device_keys

    def get_one_time_keys(
        self, user_id: UserID, device_id: DeviceID, current_otk_count: int
    ) -> Dict[str, Any]:
        new_count = self.max_one_time_keys // 2 - current_otk_count
        if new_count > 0:
            self.generate_one_time_keys(new_count)
        keys = {}
        for key_id, key in self.one_time_keys.get("curve25519", {}).items():
            signature = self.sign(base.canonical_json({"key": key}))
            keys[f"{EncryptionKeyAlgorithm.SIGNED_CURVE25519}:{key_id}"] = {
                "key": key,
                "signatures": {
                    user_id: {f"{EncryptionKeyAlgorithm.ED25519}:{device_id}": signature}
                },
            }
        self.mark_keys_as_published()
        return keys