File: bridge_state.py

package info (click to toggle)
mautrix-python 0.20.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 1,812 kB
  • sloc: python: 19,103; makefile: 16
file content (141 lines) | stat: -rw-r--r-- 5,121 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, ClassVar, Dict, Optional
import logging
import time

from attr import dataclass
import aiohttp

from mautrix.api import HTTPAPI
from mautrix.types import SerializableAttrs, SerializableEnum, UserID, field


class BridgeStateEvent(SerializableEnum):
    #####################################
    # Global state events, no remote ID #
    #####################################

    # Bridge process is starting up
    STARTING = "STARTING"
    # Bridge has started but has no valid credentials
    UNCONFIGURED = "UNCONFIGURED"
    # Bridge is running
    RUNNING = "RUNNING"
    # The server was unable to reach the bridge
    BRIDGE_UNREACHABLE = "BRIDGE_UNREACHABLE"

    ################################################
    # Remote state events, should have a remote ID #
    ################################################

    # Bridge has credentials and has started connecting to a remote network
    CONNECTING = "CONNECTING"
    # Bridge has begun backfilling
    BACKFILLING = "BACKFILLING"
    # Bridge has happily connected and is bridging messages
    CONNECTED = "CONNECTED"
    # Bridge has temporarily disconnected, expected to reconnect automatically
    TRANSIENT_DISCONNECT = "TRANSIENT_DISCONNECT"
    # Bridge has disconnected, will require user to log in again
    BAD_CREDENTIALS = "BAD_CREDENTIALS"
    # Bridge has disconnected for an unknown/unexpected reason - we should investigate
    UNKNOWN_ERROR = "UNKNOWN_ERROR"
    # User has logged out - stop tracking this remote
    LOGGED_OUT = "LOGGED_OUT"


ok_ish_states = (
    BridgeStateEvent.STARTING,
    BridgeStateEvent.UNCONFIGURED,
    BridgeStateEvent.RUNNING,
    BridgeStateEvent.CONNECTING,
    BridgeStateEvent.CONNECTED,
    BridgeStateEvent.BACKFILLING,
)


@dataclass(kw_only=True)
class BridgeState(SerializableAttrs):
    human_readable_errors: ClassVar[Dict[Optional[str], str]] = {}
    default_source: ClassVar[str] = "bridge"
    default_error_ttl: ClassVar[int] = 3600
    default_ok_ttl: ClassVar[int] = 21600

    state_event: BridgeStateEvent
    user_id: Optional[UserID] = None
    remote_id: Optional[str] = None
    remote_name: Optional[str] = None
    timestamp: Optional[int] = None
    ttl: int = 0
    source: Optional[str] = None
    error: Optional[str] = None
    message: Optional[str] = None
    info: Optional[Dict[str, Any]] = None
    reason: Optional[str] = None

    send_attempts_: int = field(default=0, hidden=True)

    def fill(self) -> "BridgeState":
        self.timestamp = self.timestamp or int(time.time())
        self.source = self.source or self.default_source
        if not self.ttl:
            self.ttl = (
                self.default_ok_ttl
                if self.state_event in ok_ish_states
                else self.default_error_ttl
            )
        if self.error:
            try:
                msg = self.human_readable_errors[self.error]
            except KeyError:
                pass
            else:
                self.message = msg.format(message=self.message) if self.message else msg
        return self

    def should_deduplicate(self, prev_state: Optional["BridgeState"]) -> bool:
        if (
            not prev_state
            or prev_state.state_event != self.state_event
            or prev_state.error != self.error
            or prev_state.info != self.info
        ):
            # If there's no previous state or the state was different, send this one.
            return False
        # If the previous state is recent, drop this one
        return prev_state.timestamp + prev_state.ttl > self.timestamp

    async def send(self, url: str, token: str, log: logging.Logger, log_sent: bool = True) -> bool:
        if not url:
            return True
        self.send_attempts_ += 1
        headers = {"Authorization": f"Bearer {token}", "User-Agent": HTTPAPI.default_ua}
        try:
            async with (
                aiohttp.ClientSession() as sess,
                sess.post(url, json=self.serialize(), headers=headers) as resp,
            ):
                if not 200 <= resp.status < 300:
                    text = await resp.text()
                    text = text.replace("\n", "\\n")
                    log.warning(
                        f"Unexpected status code {resp.status} "
                        f"sending bridge state update: {text}"
                    )
                    return False
                elif log_sent:
                    log.debug(f"Sent new bridge state {self}")
        except Exception as e:
            log.warning(f"Failed to send updated bridge state: {e}")
            return False
        return True


@dataclass(kw_only=True)
class GlobalBridgeState(SerializableAttrs):
    remote_states: Optional[Dict[str, BridgeState]] = field(json="remoteState", default=None)
    bridge_state: BridgeState = field(json="bridgeState")