File: manhole.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (125 lines) | stat: -rw-r--r-- 4,224 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

from typing import Callable
import asyncio
import os

from attr import dataclass

from mautrix.errors import MatrixConnectionError
from mautrix.types import UserID
from mautrix.util.manhole import start_manhole

from . import SECTION_ADMIN, CommandEvent, command_handler


@dataclass
class ManholeState:
    server: asyncio.AbstractServer
    opened_by: UserID
    close: Callable[[], None]
    whitelist: set[int]


@command_handler(
    needs_auth=False,
    needs_admin=True,
    help_section=SECTION_ADMIN,
    help_text="Open a manhole into the bridge.",
    help_args="<_uid..._>",
)
async def open_manhole(evt: CommandEvent) -> None:
    if not evt.config["manhole.enabled"]:
        await evt.reply("The manhole has been disabled in the config.")
        return
    elif len(evt.args) == 0:
        await evt.reply("**Usage:** `$cmdprefix+sp open-manhole <uid...>`")
        return

    whitelist = set()
    whitelist_whitelist = evt.config["manhole.whitelist"]
    for arg in evt.args:
        try:
            uid = int(arg)
        except ValueError:
            await evt.reply(f"{arg} is not an integer.")
            return
        if whitelist_whitelist and uid not in whitelist_whitelist:
            await evt.reply(f"{uid} is not in the list of allowed UIDs.")
            return
        whitelist.add(uid)

    if evt.bridge.manhole:
        added = [uid for uid in whitelist if uid not in evt.bridge.manhole.whitelist]
        evt.bridge.manhole.whitelist |= set(added)
        if len(added) == 0:
            await evt.reply(
                f"There's an existing manhole opened by {evt.bridge.manhole.opened_by}"
                " and all the given UIDs are already whitelisted."
            )
        else:
            added_str = (
                f"{', '.join(str(uid) for uid in added[:-1])} and {added[-1]}"
                if len(added) > 1
                else added[0]
            )
            await evt.reply(
                f"There's an existing manhole opened by {evt.bridge.manhole.opened_by}"
                f". Added {added_str} to the whitelist."
            )
            evt.log.info(f"{evt.sender.mxid} added {added_str} to the manhole whitelist.")
        return

    namespace = await evt.bridge.manhole_global_namespace(evt.sender.mxid)
    banner = evt.bridge.manhole_banner(evt.sender.mxid)
    path = evt.config["manhole.path"]

    wl_list = list(whitelist)
    whitelist_str = (
        f"{', '.join(str(uid) for uid in wl_list[:-1])} and {wl_list[-1]}"
        if len(wl_list) > 1
        else wl_list[0]
    )
    evt.log.info(f"{evt.sender.mxid} opened a manhole with {whitelist_str} whitelisted.")
    server, close = await start_manhole(
        path=path, banner=banner, namespace=namespace, loop=evt.loop, whitelist=whitelist
    )
    evt.bridge.manhole = ManholeState(
        server=server, opened_by=evt.sender.mxid, close=close, whitelist=whitelist
    )
    plrl = "s" if len(whitelist) != 1 else ""
    await evt.reply(f"Opened manhole at unix://{path} with UID{plrl} {whitelist_str} whitelisted")
    await server.wait_closed()
    evt.bridge.manhole = None
    try:
        os.unlink(path)
    except FileNotFoundError:
        pass
    evt.log.info(f"{evt.sender.mxid}'s manhole was closed.")
    try:
        await evt.reply("Your manhole was closed.")
    except (AttributeError, MatrixConnectionError) as e:
        evt.log.warning(f"Failed to send manhole close notification: {e}")


@command_handler(
    needs_auth=False,
    needs_admin=True,
    help_section=SECTION_ADMIN,
    help_text="Close an open manhole.",
)
async def close_manhole(evt: CommandEvent) -> None:
    if not evt.bridge.manhole:
        await evt.reply("There is no open manhole.")
        return

    opened_by = evt.bridge.manhole.opened_by
    evt.bridge.manhole.close()
    evt.bridge.manhole = None
    if opened_by != evt.sender.mxid:
        await evt.reply(f"Closed manhole opened by {opened_by}")