1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
"""mPRM communication via websocket."""
from __future__ import annotations
import json
import threading
from abc import ABC, abstractmethod
from time import sleep, time
from types import TracebackType
from typing import Any
import requests
import websocket
from urllib3.connection import ConnectTimeoutError
from devolo_home_control_api.exceptions import GatewayOfflineError
from .mprm_rest import MprmRest
try:
from typing import Self # type: ignore[attr-defined,misc]
except ImportError:
from typing_extensions import Self
class MprmWebsocket(MprmRest, ABC):
"""
The abstract MprmWebsocket object handles calls to the mPRM via websockets. It does not cover all API calls, just those
requested up to now. All calls are done in a gateway context, so you have to create a derived class, that provides a
Gateway object and a Session object. Further, the derived class needs to implement methods to connect to the websocket,
either local or remote. Last but not least, the derived class needs to implement a method that is called on new messages.
The websocket connection itself runs in a thread, that might not terminate as expected. Using a with-statement is
recommended.
"""
def __init__(self) -> None:
"""Initialize websocket communication."""
super().__init__()
self._ws: websocket.WebSocketApp | None = None
self._connected = False # This attribute saves, if the websocket is fully established
self._reachable = True # This attribute saves, if the a new session can be established
self._event_sequence = 0
def __enter__(self) -> Self:
"""Connect to the websocket."""
return self
def __exit__(
self,
exception_type: type[BaseException] | None,
exception_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Disconnect from the websocket."""
self.websocket_disconnect()
@abstractmethod
def detect_gateway_in_lan(self) -> str:
"""Detect a gateway in the local network."""
@abstractmethod
def get_local_session(self) -> bool:
"""Connect to the gateway locally."""
@abstractmethod
def get_remote_session(self) -> bool:
"""Connect to the gateway remotely."""
@abstractmethod
def on_update(self, message: dict[str, Any]) -> None:
"""Initialize steps needed to update properties on a new message."""
def wait_for_websocket_establishment(self) -> None:
"""
In some cases it is needed to wait for the websocket to be fully established. This method can be used to block your
current thread for up to one minute.
"""
start_time = time()
while not self._connected and time() < start_time + 600:
sleep(0.1)
if not self._connected:
self._logger.debug("Websocket could not be established")
raise GatewayOfflineError
def websocket_connect(self) -> None:
"""
Set up the websocket connection. The protocol type of the known session URL is exchanged depending on whether TLS is
used or not. After establishing the websocket, a ping is sent every 30 seconds to keep the connection alive. If there
is no response within 5 seconds, the connection is terminated with error state.
"""
ws_url = self._url.replace("https://", "wss://").replace("http://", "ws://")
cookie = "; ".join(f"{name}={value}" for name, value in self._session.cookies.items())
ws_url = (
f"{ws_url}/remote/events/?topics=com/prosyst/mbs/services/fim/FunctionalItemEvent/PROPERTY_CHANGED,"
f"com/prosyst/mbs/services/fim/FunctionalItemEvent/UNREGISTERED"
f"&filter=(|(GW_ID={self.gateway.id})(!(GW_ID=*)))"
)
self._logger.debug("Connecting to %s", ws_url)
self._ws = websocket.WebSocketApp(
ws_url,
cookie=cookie,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_pong=self._on_pong,
)
self._ws.run_forever(ping_interval=30, ping_timeout=5)
def websocket_disconnect(self, event: str = "") -> None:
"""Close the websocket connection."""
if not self._ws:
self._logger.info("Not connected to the web socket.")
return
self._logger.info("Closing web socket connection.")
if event:
self._logger.info("Reason: %s", event)
self._ws.close()
def _on_close(self, *_: Any) -> None:
"""React on closing the websocket."""
self._logger.info("Closed websocket connection.")
def _on_error(self, ws: websocket.WebSocketApp, error: Exception) -> None:
"""React on errors. We will try reconnecting with prolonging intervals."""
self._logger.exception(error)
self._connected = False
self._reachable = False
ws.close()
self._event_sequence = 0
sleep_interval = 16
while not self._reachable:
self._try_reconnect(sleep_interval)
sleep_interval = min(sleep_interval * 2, 3600)
self.websocket_connect()
def _on_message(self, _: websocket.WebSocketApp, message: str) -> None:
"""React on a message."""
msg = json.loads(message)
self._logger.debug("Got message from websocket:\n%s", msg)
event_sequence = msg["properties"]["com.prosyst.mbs.services.remote.event.sequence.number"]
if event_sequence == self._event_sequence:
self._event_sequence += 1
else:
self._logger.warning(
"We missed a websocket message. Internal event_sequence is at %s. Event sequence by websocket is at %s",
self._event_sequence,
event_sequence,
)
self._event_sequence = event_sequence + 1
self._logger.debug("self._event_sequence is set to %s", self._event_sequence)
self.on_update(msg)
def _on_open(self, ws: websocket.WebSocketApp) -> None:
"""Keep the websocket open."""
def run() -> None:
self._logger.info("Starting web socket connection.")
while ws.sock is not None and ws.sock.connected:
sleep(1)
threading.Thread(target=run, name=f"{self.__class__.__name__}.websocket_run").start()
self._connected = True
def _on_pong(self, *_: Any) -> None:
"""Keep the session valid."""
self.refresh_session()
def _try_reconnect(self, sleep_interval: int) -> None:
"""Try to reconnect to the websocket."""
try:
self._logger.info("Trying to reconnect to the websocket.")
self._reachable = self.get_local_session() if self._local_ip else self.get_remote_session()
except (ConnectTimeoutError, GatewayOfflineError):
self._logger.info("Sleeping for %s seconds.", sleep_interval)
sleep(sleep_interval)
except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
self._logger.info("Sleeping for %s seconds.", sleep_interval)
sleep(sleep_interval - 3) # mDNS browsing will take up tp 3 seconds by itself
self.detect_gateway_in_lan()
|