File: custom_puppet.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (347 lines) | stat: -rw-r--r-- 12,998 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

from abc import ABC, abstractmethod
import asyncio
import hashlib
import hmac
import logging

from yarl import URL

from mautrix.appservice import AppService, IntentAPI
from mautrix.client import ClientAPI
from mautrix.errors import (
    IntentError,
    MatrixError,
    MatrixInvalidToken,
    MatrixRequestError,
    WellKnownError,
)
from mautrix.types import LoginType, MatrixUserIdentifier, RoomID, UserID

from .. import bridge as br


class CustomPuppetError(MatrixError):
    """Base class for double puppeting setup errors."""


class InvalidAccessToken(CustomPuppetError):
    def __init__(self):
        super().__init__("The given access token was invalid.")


class OnlyLoginSelf(CustomPuppetError):
    def __init__(self):
        super().__init__("You may only enable double puppeting with your own Matrix account.")


class EncryptionKeysFound(CustomPuppetError):
    def __init__(self):
        super().__init__(
            "The given access token is for a device that has encryption keys set up. "
            "Please provide a fresh token, don't reuse one from another client."
        )


class HomeserverURLNotFound(CustomPuppetError):
    def __init__(self, domain: str):
        super().__init__(
            f"Could not discover a valid homeserver URL for {domain}."
            " Please ensure a client .well-known file is set up, or ask the bridge administrator "
            "to add the homeserver URL to the bridge config."
        )


class OnlyLoginTrustedDomain(CustomPuppetError):
    def __init__(self):
        super().__init__(
            "This bridge doesn't allow double-puppeting with accounts on untrusted servers."
        )


class AutologinError(CustomPuppetError):
    pass


class CustomPuppetMixin(ABC):
    """
    Mixin for the Puppet class to enable Matrix puppeting.

    Attributes:
        sync_with_custom_puppets: Whether or not custom puppets should /sync
        allow_discover_url: Allow logging into other homeservers using .well-known discovery.
        homeserver_url_map: Static map from server name to URL that are always allowed to log in.
        only_handle_own_synced_events: Whether or not typing notifications and read receipts by
                                       other users should be filtered away before passing them to
                                       the Matrix event handler.

        az: The AppService object.
        loop: The asyncio event loop.
        log: The logger to use.
        mx: The Matrix event handler to send /sync events to.

        by_custom_mxid: A mapping from custom mxid to puppet object.

        default_mxid: The default user ID of the puppet.
        default_mxid_intent: The IntentAPI for the default user ID.
        custom_mxid: The user ID of the custom puppet.
        access_token: The access token for the custom puppet.

        intent: The primary IntentAPI.
    """

    allow_discover_url: bool = False
    homeserver_url_map: dict[str, URL] = {}
    only_handle_own_synced_events: bool = True
    login_shared_secret_map: dict[str, bytes] = {}
    login_device_name: str | None = None

    az: AppService
    loop: asyncio.AbstractEventLoop
    log: logging.Logger
    mx: br.BaseMatrixHandler

    by_custom_mxid: dict[UserID, CustomPuppetMixin] = {}

    default_mxid: UserID
    default_mxid_intent: IntentAPI
    custom_mxid: UserID | None
    access_token: str | None
    base_url: URL | None

    intent: IntentAPI

    @abstractmethod
    async def save(self) -> None:
        """Save the information of this puppet. Called from :meth:`switch_mxid`"""

    @property
    def mxid(self) -> UserID:
        """The main Matrix user ID of this puppet."""
        return self.custom_mxid or self.default_mxid

    @property
    def is_real_user(self) -> bool:
        """Whether this puppet uses a real Matrix user instead of an appservice-owned ID."""
        return bool(self.custom_mxid and self.access_token)

    def _fresh_intent(self) -> IntentAPI:
        if self.custom_mxid:
            _, server = self.az.intent.parse_user_id(self.custom_mxid)
            try:
                self.base_url = self.homeserver_url_map[server]
            except KeyError:
                if server == self.az.domain:
                    self.base_url = self.az.intent.api.base_url
        if self.access_token == "appservice-config" and self.custom_mxid:
            try:
                secret = self.login_shared_secret_map[server]
            except KeyError:
                raise AutologinError(f"No shared secret configured for {server}")
            self.log.debug(f"Using as_token for double puppeting {self.custom_mxid}")
            return self.az.intent.user(
                self.custom_mxid,
                secret.decode("utf-8").removeprefix("as_token:"),
                self.base_url,
                as_token=True,
            )
        return (
            self.az.intent.user(self.custom_mxid, self.access_token, self.base_url)
            if self.is_real_user
            else self.default_mxid_intent
        )

    @classmethod
    def can_auto_login(cls, mxid: UserID) -> bool:
        _, server = cls.az.intent.parse_user_id(mxid)
        return server in cls.login_shared_secret_map and (
            server in cls.homeserver_url_map or server == cls.az.domain
        )

    @classmethod
    async def _login_with_shared_secret(cls, mxid: UserID) -> str:
        _, server = cls.az.intent.parse_user_id(mxid)
        try:
            secret = cls.login_shared_secret_map[server]
        except KeyError:
            raise AutologinError(f"No shared secret configured for {server}")
        if secret.startswith(b"as_token:"):
            return "appservice-config"
        try:
            base_url = cls.homeserver_url_map[server]
        except KeyError:
            if server == cls.az.domain:
                base_url = cls.az.intent.api.base_url
            else:
                raise AutologinError(f"No homeserver URL configured for {server}")
        client = ClientAPI(base_url=base_url)
        login_args = {}
        if secret == b"appservice":
            login_type = LoginType.APPSERVICE
            client.api.token = cls.az.as_token
        else:
            flows = await client.get_login_flows()
            flow = flows.get_first_of_type(LoginType.DEVTURE_SHARED_SECRET, LoginType.PASSWORD)
            if not flow:
                raise AutologinError("No supported shared secret auth login flows")
            login_type = flow.type
            token = hmac.new(secret, mxid.encode("utf-8"), hashlib.sha512).hexdigest()
            if login_type == LoginType.DEVTURE_SHARED_SECRET:
                login_args["token"] = token
            elif login_type == LoginType.PASSWORD:
                login_args["password"] = token
        resp = await client.login(
            identifier=MatrixUserIdentifier(user=mxid),
            device_id=cls.login_device_name,
            initial_device_display_name=cls.login_device_name,
            login_type=login_type,
            **login_args,
            store_access_token=False,
            update_hs_url=False,
        )
        return resp.access_token

    async def switch_mxid(
        self, access_token: str | None, mxid: UserID | None, start_sync_task: bool = True
    ) -> None:
        """
        Switch to a real Matrix user or away from one.

        Args:
            access_token: The access token for the custom account, or ``None`` to switch back to
                          the appservice-owned ID.
            mxid: The expected Matrix user ID of the custom account, or ``None`` when
                  ``access_token`` is None.
        """
        if access_token == "auto":
            access_token = await self._login_with_shared_secret(mxid)
            if access_token != "appservice-config":
                self.log.debug(f"Logged in for {mxid} using shared secret")

        if mxid is not None:
            _, mxid_domain = self.az.intent.parse_user_id(mxid)
            if mxid_domain in self.homeserver_url_map:
                base_url = self.homeserver_url_map[mxid_domain]
            elif mxid_domain == self.az.domain:
                base_url = None
            else:
                if not self.allow_discover_url:
                    raise OnlyLoginTrustedDomain()
                try:
                    base_url = await IntentAPI.discover(mxid_domain, self.az.http_session)
                except WellKnownError as e:
                    raise HomeserverURLNotFound(mxid_domain) from e
                if base_url is None:
                    raise HomeserverURLNotFound(mxid_domain)
        else:
            base_url = None

        prev_mxid = self.custom_mxid
        self.custom_mxid = mxid
        self.access_token = access_token
        self.base_url = base_url
        self.intent = self._fresh_intent()

        await self.start(check_e2ee_keys=True)

        try:
            del self.by_custom_mxid[prev_mxid]
        except KeyError:
            pass
        if self.mxid != self.default_mxid:
            self.by_custom_mxid[self.mxid] = self
            try:
                await self._leave_rooms_with_default_user()
            except Exception:
                self.log.warning("Error when leaving rooms with default user", exc_info=True)
        await self.save()

    async def try_start(self, retry_auto_login: bool = True) -> None:
        try:
            await self.start(retry_auto_login=retry_auto_login)
        except Exception:
            self.log.exception("Failed to initialize custom mxid")

    async def _invalidate_double_puppet(self) -> None:
        if self.custom_mxid and self.by_custom_mxid.get(self.custom_mxid) == self:
            del self.by_custom_mxid[self.custom_mxid]
        self.custom_mxid = None
        self.access_token = None
        await self.save()
        self.intent = self._fresh_intent()

    async def start(
        self,
        retry_auto_login: bool = False,
        start_sync_task: bool = True,
        check_e2ee_keys: bool = False,
    ) -> None:
        """Initialize the custom account this puppet uses. Should be called at startup to start
        the /sync task. Is called by :meth:`switch_mxid` automatically."""
        if not self.is_real_user:
            return

        try:
            whoami = await self.intent.whoami()
        except MatrixInvalidToken as e:
            if retry_auto_login and self.custom_mxid and self.can_auto_login(self.custom_mxid):
                self.log.debug(f"Got {e.errcode} while trying to initialize custom mxid")
                await self.switch_mxid("auto", self.custom_mxid)
                return
            self.log.warning(f"Got {e.errcode} while trying to initialize custom mxid")
            whoami = None
        if not whoami or whoami.user_id != self.custom_mxid:
            prev_custom_mxid = self.custom_mxid
            await self._invalidate_double_puppet()
            if whoami and whoami.user_id != prev_custom_mxid:
                raise OnlyLoginSelf()
            raise InvalidAccessToken()
        if check_e2ee_keys:
            try:
                devices = await self.intent.query_keys({whoami.user_id: [whoami.device_id]})
                device_keys = devices.device_keys.get(whoami.user_id, {}).get(whoami.device_id)
            except Exception:
                self.log.warning(
                    "Failed to query keys to check if double puppeting token was reused",
                    exc_info=True,
                )
            else:
                if device_keys and len(device_keys.keys) > 0:
                    await self._invalidate_double_puppet()
                    raise EncryptionKeysFound()
        self.log.info(f"Initialized custom mxid: {whoami.user_id}")

    def stop(self) -> None:
        """
        No-op

        .. deprecated:: 0.20.1
        """

    async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
        """
        Whether or not the default puppet user should leave the given room when this puppet is
        switched to using a custom user account.

        Args:
            room_id: The room to check.

        Returns:
            Whether or not the default user account should leave.
        """
        return True

    async def _leave_rooms_with_default_user(self) -> None:
        for room_id in await self.default_mxid_intent.get_joined_rooms():
            try:
                if await self.default_puppet_should_leave_room(room_id):
                    await self.default_mxid_intent.leave_room(room_id)
                    await self.intent.ensure_joined(room_id)
            except (IntentError, MatrixRequestError):
                pass