1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
|
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations
from abc import ABC, abstractmethod
import asyncio
import hashlib
import hmac
import logging
from yarl import URL
from mautrix.appservice import AppService, IntentAPI
from mautrix.client import ClientAPI
from mautrix.errors import (
IntentError,
MatrixError,
MatrixInvalidToken,
MatrixRequestError,
WellKnownError,
)
from mautrix.types import LoginType, MatrixUserIdentifier, RoomID, UserID
from .. import bridge as br
class CustomPuppetError(MatrixError):
"""Base class for double puppeting setup errors."""
class InvalidAccessToken(CustomPuppetError):
def __init__(self):
super().__init__("The given access token was invalid.")
class OnlyLoginSelf(CustomPuppetError):
def __init__(self):
super().__init__("You may only enable double puppeting with your own Matrix account.")
class EncryptionKeysFound(CustomPuppetError):
def __init__(self):
super().__init__(
"The given access token is for a device that has encryption keys set up. "
"Please provide a fresh token, don't reuse one from another client."
)
class HomeserverURLNotFound(CustomPuppetError):
def __init__(self, domain: str):
super().__init__(
f"Could not discover a valid homeserver URL for {domain}."
" Please ensure a client .well-known file is set up, or ask the bridge administrator "
"to add the homeserver URL to the bridge config."
)
class OnlyLoginTrustedDomain(CustomPuppetError):
def __init__(self):
super().__init__(
"This bridge doesn't allow double-puppeting with accounts on untrusted servers."
)
class AutologinError(CustomPuppetError):
pass
class CustomPuppetMixin(ABC):
"""
Mixin for the Puppet class to enable Matrix puppeting.
Attributes:
sync_with_custom_puppets: Whether or not custom puppets should /sync
allow_discover_url: Allow logging into other homeservers using .well-known discovery.
homeserver_url_map: Static map from server name to URL that are always allowed to log in.
only_handle_own_synced_events: Whether or not typing notifications and read receipts by
other users should be filtered away before passing them to
the Matrix event handler.
az: The AppService object.
loop: The asyncio event loop.
log: The logger to use.
mx: The Matrix event handler to send /sync events to.
by_custom_mxid: A mapping from custom mxid to puppet object.
default_mxid: The default user ID of the puppet.
default_mxid_intent: The IntentAPI for the default user ID.
custom_mxid: The user ID of the custom puppet.
access_token: The access token for the custom puppet.
intent: The primary IntentAPI.
"""
allow_discover_url: bool = False
homeserver_url_map: dict[str, URL] = {}
only_handle_own_synced_events: bool = True
login_shared_secret_map: dict[str, bytes] = {}
login_device_name: str | None = None
az: AppService
loop: asyncio.AbstractEventLoop
log: logging.Logger
mx: br.BaseMatrixHandler
by_custom_mxid: dict[UserID, CustomPuppetMixin] = {}
default_mxid: UserID
default_mxid_intent: IntentAPI
custom_mxid: UserID | None
access_token: str | None
base_url: URL | None
intent: IntentAPI
@abstractmethod
async def save(self) -> None:
"""Save the information of this puppet. Called from :meth:`switch_mxid`"""
@property
def mxid(self) -> UserID:
"""The main Matrix user ID of this puppet."""
return self.custom_mxid or self.default_mxid
@property
def is_real_user(self) -> bool:
"""Whether this puppet uses a real Matrix user instead of an appservice-owned ID."""
return bool(self.custom_mxid and self.access_token)
def _fresh_intent(self) -> IntentAPI:
if self.custom_mxid:
_, server = self.az.intent.parse_user_id(self.custom_mxid)
try:
self.base_url = self.homeserver_url_map[server]
except KeyError:
if server == self.az.domain:
self.base_url = self.az.intent.api.base_url
if self.access_token == "appservice-config" and self.custom_mxid:
try:
secret = self.login_shared_secret_map[server]
except KeyError:
raise AutologinError(f"No shared secret configured for {server}")
self.log.debug(f"Using as_token for double puppeting {self.custom_mxid}")
return self.az.intent.user(
self.custom_mxid,
secret.decode("utf-8").removeprefix("as_token:"),
self.base_url,
as_token=True,
)
return (
self.az.intent.user(self.custom_mxid, self.access_token, self.base_url)
if self.is_real_user
else self.default_mxid_intent
)
@classmethod
def can_auto_login(cls, mxid: UserID) -> bool:
_, server = cls.az.intent.parse_user_id(mxid)
return server in cls.login_shared_secret_map and (
server in cls.homeserver_url_map or server == cls.az.domain
)
@classmethod
async def _login_with_shared_secret(cls, mxid: UserID) -> str:
_, server = cls.az.intent.parse_user_id(mxid)
try:
secret = cls.login_shared_secret_map[server]
except KeyError:
raise AutologinError(f"No shared secret configured for {server}")
if secret.startswith(b"as_token:"):
return "appservice-config"
try:
base_url = cls.homeserver_url_map[server]
except KeyError:
if server == cls.az.domain:
base_url = cls.az.intent.api.base_url
else:
raise AutologinError(f"No homeserver URL configured for {server}")
client = ClientAPI(base_url=base_url)
login_args = {}
if secret == b"appservice":
login_type = LoginType.APPSERVICE
client.api.token = cls.az.as_token
else:
flows = await client.get_login_flows()
flow = flows.get_first_of_type(LoginType.DEVTURE_SHARED_SECRET, LoginType.PASSWORD)
if not flow:
raise AutologinError("No supported shared secret auth login flows")
login_type = flow.type
token = hmac.new(secret, mxid.encode("utf-8"), hashlib.sha512).hexdigest()
if login_type == LoginType.DEVTURE_SHARED_SECRET:
login_args["token"] = token
elif login_type == LoginType.PASSWORD:
login_args["password"] = token
resp = await client.login(
identifier=MatrixUserIdentifier(user=mxid),
device_id=cls.login_device_name,
initial_device_display_name=cls.login_device_name,
login_type=login_type,
**login_args,
store_access_token=False,
update_hs_url=False,
)
return resp.access_token
async def switch_mxid(
self, access_token: str | None, mxid: UserID | None, start_sync_task: bool = True
) -> None:
"""
Switch to a real Matrix user or away from one.
Args:
access_token: The access token for the custom account, or ``None`` to switch back to
the appservice-owned ID.
mxid: The expected Matrix user ID of the custom account, or ``None`` when
``access_token`` is None.
"""
if access_token == "auto":
access_token = await self._login_with_shared_secret(mxid)
if access_token != "appservice-config":
self.log.debug(f"Logged in for {mxid} using shared secret")
if mxid is not None:
_, mxid_domain = self.az.intent.parse_user_id(mxid)
if mxid_domain in self.homeserver_url_map:
base_url = self.homeserver_url_map[mxid_domain]
elif mxid_domain == self.az.domain:
base_url = None
else:
if not self.allow_discover_url:
raise OnlyLoginTrustedDomain()
try:
base_url = await IntentAPI.discover(mxid_domain, self.az.http_session)
except WellKnownError as e:
raise HomeserverURLNotFound(mxid_domain) from e
if base_url is None:
raise HomeserverURLNotFound(mxid_domain)
else:
base_url = None
prev_mxid = self.custom_mxid
self.custom_mxid = mxid
self.access_token = access_token
self.base_url = base_url
self.intent = self._fresh_intent()
await self.start(check_e2ee_keys=True)
try:
del self.by_custom_mxid[prev_mxid]
except KeyError:
pass
if self.mxid != self.default_mxid:
self.by_custom_mxid[self.mxid] = self
try:
await self._leave_rooms_with_default_user()
except Exception:
self.log.warning("Error when leaving rooms with default user", exc_info=True)
await self.save()
async def try_start(self, retry_auto_login: bool = True) -> None:
try:
await self.start(retry_auto_login=retry_auto_login)
except Exception:
self.log.exception("Failed to initialize custom mxid")
async def _invalidate_double_puppet(self) -> None:
if self.custom_mxid and self.by_custom_mxid.get(self.custom_mxid) == self:
del self.by_custom_mxid[self.custom_mxid]
self.custom_mxid = None
self.access_token = None
await self.save()
self.intent = self._fresh_intent()
async def start(
self,
retry_auto_login: bool = False,
start_sync_task: bool = True,
check_e2ee_keys: bool = False,
) -> None:
"""Initialize the custom account this puppet uses. Should be called at startup to start
the /sync task. Is called by :meth:`switch_mxid` automatically."""
if not self.is_real_user:
return
try:
whoami = await self.intent.whoami()
except MatrixInvalidToken as e:
if retry_auto_login and self.custom_mxid and self.can_auto_login(self.custom_mxid):
self.log.debug(f"Got {e.errcode} while trying to initialize custom mxid")
await self.switch_mxid("auto", self.custom_mxid)
return
self.log.warning(f"Got {e.errcode} while trying to initialize custom mxid")
whoami = None
if not whoami or whoami.user_id != self.custom_mxid:
prev_custom_mxid = self.custom_mxid
await self._invalidate_double_puppet()
if whoami and whoami.user_id != prev_custom_mxid:
raise OnlyLoginSelf()
raise InvalidAccessToken()
if check_e2ee_keys:
try:
devices = await self.intent.query_keys({whoami.user_id: [whoami.device_id]})
device_keys = devices.device_keys.get(whoami.user_id, {}).get(whoami.device_id)
except Exception:
self.log.warning(
"Failed to query keys to check if double puppeting token was reused",
exc_info=True,
)
else:
if device_keys and len(device_keys.keys) > 0:
await self._invalidate_double_puppet()
raise EncryptionKeysFound()
self.log.info(f"Initialized custom mxid: {whoami.user_id}")
def stop(self) -> None:
"""
No-op
.. deprecated:: 0.20.1
"""
async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
"""
Whether or not the default puppet user should leave the given room when this puppet is
switched to using a custom user account.
Args:
room_id: The room to check.
Returns:
Whether or not the default user account should leave.
"""
return True
async def _leave_rooms_with_default_user(self) -> None:
for room_id in await self.default_mxid_intent.get_joined_rooms():
try:
if await self.default_puppet_should_leave_room(room_id):
await self.default_mxid_intent.leave_room(room_id)
await self.intent.ensure_joined(room_id)
except (IntentError, MatrixRequestError):
pass
|