1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, ClassVar, Dict, Optional
import logging
import time
from attr import dataclass
import aiohttp
from mautrix.api import HTTPAPI
from mautrix.types import SerializableAttrs, SerializableEnum, UserID, field
class BridgeStateEvent(SerializableEnum):
#####################################
# Global state events, no remote ID #
#####################################
# Bridge process is starting up
STARTING = "STARTING"
# Bridge has started but has no valid credentials
UNCONFIGURED = "UNCONFIGURED"
# Bridge is running
RUNNING = "RUNNING"
# The server was unable to reach the bridge
BRIDGE_UNREACHABLE = "BRIDGE_UNREACHABLE"
################################################
# Remote state events, should have a remote ID #
################################################
# Bridge has credentials and has started connecting to a remote network
CONNECTING = "CONNECTING"
# Bridge has begun backfilling
BACKFILLING = "BACKFILLING"
# Bridge has happily connected and is bridging messages
CONNECTED = "CONNECTED"
# Bridge has temporarily disconnected, expected to reconnect automatically
TRANSIENT_DISCONNECT = "TRANSIENT_DISCONNECT"
# Bridge has disconnected, will require user to log in again
BAD_CREDENTIALS = "BAD_CREDENTIALS"
# Bridge has disconnected for an unknown/unexpected reason - we should investigate
UNKNOWN_ERROR = "UNKNOWN_ERROR"
# User has logged out - stop tracking this remote
LOGGED_OUT = "LOGGED_OUT"
ok_ish_states = (
BridgeStateEvent.STARTING,
BridgeStateEvent.UNCONFIGURED,
BridgeStateEvent.RUNNING,
BridgeStateEvent.CONNECTING,
BridgeStateEvent.CONNECTED,
BridgeStateEvent.BACKFILLING,
)
@dataclass(kw_only=True)
class BridgeState(SerializableAttrs):
human_readable_errors: ClassVar[Dict[Optional[str], str]] = {}
default_source: ClassVar[str] = "bridge"
default_error_ttl: ClassVar[int] = 3600
default_ok_ttl: ClassVar[int] = 21600
state_event: BridgeStateEvent
user_id: Optional[UserID] = None
remote_id: Optional[str] = None
remote_name: Optional[str] = None
timestamp: Optional[int] = None
ttl: int = 0
source: Optional[str] = None
error: Optional[str] = None
message: Optional[str] = None
info: Optional[Dict[str, Any]] = None
reason: Optional[str] = None
send_attempts_: int = field(default=0, hidden=True)
def fill(self) -> "BridgeState":
self.timestamp = self.timestamp or int(time.time())
self.source = self.source or self.default_source
if not self.ttl:
self.ttl = (
self.default_ok_ttl
if self.state_event in ok_ish_states
else self.default_error_ttl
)
if self.error:
try:
msg = self.human_readable_errors[self.error]
except KeyError:
pass
else:
self.message = msg.format(message=self.message) if self.message else msg
return self
def should_deduplicate(self, prev_state: Optional["BridgeState"]) -> bool:
if (
not prev_state
or prev_state.state_event != self.state_event
or prev_state.error != self.error
or prev_state.info != self.info
):
# If there's no previous state or the state was different, send this one.
return False
# If the previous state is recent, drop this one
return prev_state.timestamp + prev_state.ttl > self.timestamp
async def send(self, url: str, token: str, log: logging.Logger, log_sent: bool = True) -> bool:
if not url:
return True
self.send_attempts_ += 1
headers = {"Authorization": f"Bearer {token}", "User-Agent": HTTPAPI.default_ua}
try:
async with (
aiohttp.ClientSession() as sess,
sess.post(url, json=self.serialize(), headers=headers) as resp,
):
if not 200 <= resp.status < 300:
text = await resp.text()
text = text.replace("\n", "\\n")
log.warning(
f"Unexpected status code {resp.status} "
f"sending bridge state update: {text}"
)
return False
elif log_sent:
log.debug(f"Sent new bridge state {self}")
except Exception as e:
log.warning(f"Failed to send updated bridge state: {e}")
return False
return True
@dataclass(kw_only=True)
class GlobalBridgeState(SerializableAttrs):
remote_states: Optional[Dict[str, BridgeState]] = field(json="remoteState", default=None)
bridge_state: BridgeState = field(json="bridgeState")
|