File: appservice.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (291 lines) | stat: -rw-r--r-- 10,664 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

from typing import Any, Awaitable
from datetime import datetime, timezone
import asyncio

from aiohttp import ClientSession
from yarl import URL

from mautrix.api import HTTPAPI, Method, PathBuilder
from mautrix.types import UserID
from mautrix.util.logging import TraceLogger

from .. import api as as_api, state_store as ss


class AppServiceAPI(HTTPAPI):
    """
    AppServiceAPI is an extension to HTTPAPI that provides appservice-specific features,
    such as child instances and easy access to IntentAPIs.
    """

    base_log: TraceLogger

    identity: UserID | None
    bot_mxid: UserID

    state_store: ss.ASStateStore
    txn_id: int
    children: dict[str, ChildAppServiceAPI]
    real_users: dict[str, AppServiceAPI]

    is_real_user: bool
    bridge_name: str | None

    _bot_intent: as_api.IntentAPI | None

    def __init__(
        self,
        base_url: URL | str,
        bot_mxid: UserID = None,
        token: str = None,
        identity: UserID | None = None,
        log: TraceLogger = None,
        state_store: ss.ASStateStore = None,
        client_session: ClientSession = None,
        child: bool = False,
        real_user: bool = False,
        real_user_as_token: bool = False,
        bridge_name: str | None = None,
        default_retry_count: int = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """
        Args:
            base_url: The base URL of the homeserver client-server API to use.
            bot_mxid: The Matrix user ID of the appservice bot.
            token: The access token to use.
            identity: The ID of the Matrix user to act as.
            log: The logging.Logger instance to log requests with.
            state_store: The StateStore instance to use.
            client_session: The aiohttp ClientSession to use.
            child: Whether or not this is instance is a child of another AppServiceAPI.
            real_user: Whether or not this is a real (non-appservice-managed) user.
            real_user_as_token: Whether this real user is actually using another ``as_token``.
            bridge_name: The name of the bridge to put in the ``fi.mau.double_puppet_source`` field
                in outgoing message events sent through real users.
        """
        self.base_log = log
        api_log = self.base_log.getChild("api").getChild(identity or "bot")
        super().__init__(
            base_url=base_url,
            token=token,
            loop=loop,
            log=api_log,
            client_session=client_session,
            txn_id=0 if not child else None,
            default_retry_count=default_retry_count,
        )
        self.identity = identity
        self.bot_mxid = bot_mxid
        self._bot_intent = None
        self.state_store = state_store
        self.is_real_user = real_user
        self.is_real_user_as_token = real_user_as_token
        self.bridge_name = bridge_name

        if not child:
            self.txn_id = 0
            if not real_user:
                self.children = {}
                self.real_users = {}

    def user(self, user: UserID) -> ChildAppServiceAPI:
        """
        Get the AppServiceAPI for an appservice-managed user.

        Args:
            user: The Matrix user ID of the user whose AppServiceAPI to get.

        Returns:
            The ChildAppServiceAPI object for the user.
        """
        if self.is_real_user:
            raise ValueError("Can't get child of real user")

        try:
            return self.children[user]
        except KeyError:
            child = ChildAppServiceAPI(user, self)
            self.children[user] = child
            return child

    def real_user(
        self, mxid: UserID, token: str, base_url: URL | None = None, as_token: bool = False
    ) -> AppServiceAPI:
        """
        Get the AppServiceAPI for a real (non-appservice-managed) Matrix user.

        Args:
            mxid: The Matrix user ID of the user whose AppServiceAPI to get.
            token: The access token for the user.
            base_url: The base URL of the homeserver client-server API to use. Defaults to the
                appservice homeserver URL.
            as_token: Whether the token is actually an as_token
                (meaning the ``user_id`` query parameter needs to be used).

        Returns:
            The AppServiceAPI object for the user.

        Raises:
            ValueError: When this AppServiceAPI instance is a real user.
        """
        if self.is_real_user:
            raise ValueError("Can't get child of real user")

        try:
            child = self.real_users[mxid]
            child.base_url = base_url or child.base_url
            child.token = token or child.token
            child.is_real_user_as_token = as_token
        except KeyError:
            child = type(self)(
                base_url=base_url or self.base_url,
                token=token,
                identity=mxid,
                log=self.base_log,
                state_store=self.state_store,
                client_session=self.session,
                real_user=True,
                real_user_as_token=as_token,
                bridge_name=self.bridge_name,
                default_retry_count=self.default_retry_count,
            )
            self.real_users[mxid] = child
        return child

    def bot_intent(self) -> as_api.IntentAPI:
        """
        Get the intent API for the appservice bot.

        Returns:
            The IntentAPI object for the appservice bot
        """
        if not self._bot_intent:
            self._bot_intent = as_api.IntentAPI(self.bot_mxid, self, state_store=self.state_store)
        return self._bot_intent

    def intent(
        self,
        user: UserID = None,
        token: str | None = None,
        base_url: str | None = None,
        real_user_as_token: bool = False,
    ) -> as_api.IntentAPI:
        """
        Get the intent API of a child user.

        Args:
            user: The Matrix user ID whose intent API to get.
            token: The access token to use. Only applicable for non-appservice-managed users.
            base_url: The base URL of the homeserver client-server API to use. Only applicable for
                non-appservice users. Defaults to the appservice homeserver URL.
            real_user_as_token: When providing a token, whether it's actually another as_token
                (meaning the ``user_id`` query parameter needs to be used).

        Returns:
            The IntentAPI object for the given user.

        Raises:
            ValueError: When this AppServiceAPI instance is a real user.
        """
        if self.is_real_user:
            raise ValueError("Can't get child intent of real user")
        if token:
            return as_api.IntentAPI(
                user,
                self.real_user(user, token, base_url, as_token=real_user_as_token),
                self.bot_intent(),
                self.state_store,
            )
        return as_api.IntentAPI(user, self.user(user), self.bot_intent(), self.state_store)

    def request(
        self,
        method: Method,
        path: PathBuilder,
        content: dict | bytes | str | None = None,
        timestamp: int | None = None,
        headers: dict[str, str] | None = None,
        query_params: dict[str, Any] | None = None,
        retry_count: int | None = None,
        metrics_method: str | None = "",
        min_iter_size: int = 25 * 1024 * 1024,
    ) -> Awaitable[dict]:
        """
        Make a raw Matrix API request, acting as the appservice user assigned to this AppServiceAPI
        instance and optionally including timestamp massaging.

        Args:
            method: The HTTP method to use.
            path: The full API endpoint to call (including the _matrix/... prefix)
            content: The content to post as a dict/list (will be serialized as JSON)
                     or bytes/str (will be sent as-is).
            timestamp: The timestamp query param used for timestamp massaging.
            headers: A dict of HTTP headers to send. If the headers don't contain ``Content-Type``,
                     it'll be set to ``application/json``. The ``Authorization`` header is always
                     overridden if :attr:`token` is set.
            query_params: A dict of query parameters to send.
            retry_count: Number of times to retry if the homeserver isn't reachable.
                         Defaults to :attr:`default_retry_count`.
            metrics_method: Name of the method to include in Prometheus timing metrics.
            min_iter_size: If the request body is larger than this value, it will be passed to
                           aiohttp as an async iterable to stop it from copying the whole thing
                           in memory.

        Returns:
            The parsed response JSON.
        """
        query_params = query_params or {}
        if timestamp is not None:
            if isinstance(timestamp, datetime):
                timestamp = int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)
            query_params["ts"] = timestamp
        if not self.is_real_user or self.is_real_user_as_token:
            query_params["user_id"] = self.identity or self.bot_mxid

        return super().request(
            method, path, content, headers, query_params, retry_count, metrics_method
        )


class ChildAppServiceAPI(AppServiceAPI):
    """
    ChildAppServiceAPI is a simple way to copy AppServiceAPIs while maintaining a shared txn_id.
    """

    parent: AppServiceAPI

    def __init__(self, user: UserID, parent: AppServiceAPI) -> None:
        """
        Args:
            user: The Matrix user ID of the child user.
            parent: The parent AppServiceAPI instance.
        """
        super().__init__(
            parent.base_url,
            parent.bot_mxid,
            parent.token,
            user,
            parent.base_log,
            parent.state_store,
            parent.session,
            child=True,
            bridge_name=parent.bridge_name,
            default_retry_count=parent.default_retry_count,
        )
        self.parent = parent

    @property
    def txn_id(self) -> int:
        return self.parent.txn_id

    @txn_id.setter
    def txn_id(self, value: int) -> None:
        self.parent.txn_id = value