File: mprm_websocket.py

package info (click to toggle)
devolo-home-control-api 0.19.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 804 kB
  • sloc: python: 3,167; makefile: 3
file content (185 lines) | stat: -rw-r--r-- 7,423 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""mPRM communication via websocket."""
from __future__ import annotations

import json
import threading
from abc import ABC, abstractmethod
from time import sleep, time
from types import TracebackType
from typing import Any

import requests
import websocket
from urllib3.connection import ConnectTimeoutError

from devolo_home_control_api.exceptions import GatewayOfflineError

from .mprm_rest import MprmRest

try:
    from typing import Self  # type: ignore[attr-defined,misc]
except ImportError:
    from typing_extensions import Self


class MprmWebsocket(MprmRest, ABC):
    """
    The abstract MprmWebsocket object handles calls to the mPRM via websockets. It does not cover all API calls, just those
    requested up to now. All calls are done in a gateway context, so you have to create a derived class, that provides a
    Gateway object and a Session object. Further, the derived class needs to implement methods to connect to the websocket,
    either local or remote. Last but not least, the derived class needs to implement a method that is called on new messages.

    The websocket connection itself runs in a thread, that might not terminate as expected. Using a with-statement is
    recommended.
    """

    def __init__(self) -> None:
        """Initialize websocket communication."""
        super().__init__()
        self._ws: websocket.WebSocketApp | None = None
        self._connected = False  # This attribute saves, if the websocket is fully established
        self._reachable = True  # This attribute saves, if the a new session can be established
        self._event_sequence = 0

    def __enter__(self) -> Self:
        """Connect to the websocket."""
        return self

    def __exit__(
        self,
        exception_type: type[BaseException] | None,
        exception_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Disconnect from the websocket."""
        self.websocket_disconnect()

    @abstractmethod
    def detect_gateway_in_lan(self) -> str:
        """Detect a gateway in the local network."""

    @abstractmethod
    def get_local_session(self) -> bool:
        """Connect to the gateway locally."""

    @abstractmethod
    def get_remote_session(self) -> bool:
        """Connect to the gateway remotely."""

    @abstractmethod
    def on_update(self, message: dict[str, Any]) -> None:
        """Initialize steps needed to update properties on a new message."""

    def wait_for_websocket_establishment(self) -> None:
        """
        In some cases it is needed to wait for the websocket to be fully established. This method can be used to block your
        current thread for up to one minute.
        """
        start_time = time()
        while not self._connected and time() < start_time + 600:
            sleep(0.1)
        if not self._connected:
            self._logger.debug("Websocket could not be established")
            raise GatewayOfflineError

    def websocket_connect(self) -> None:
        """
        Set up the websocket connection. The protocol type of the known session URL is exchanged depending on whether TLS is
        used or not. After establishing the websocket, a ping is sent every 30 seconds to keep the connection alive. If there
        is no response within 5 seconds, the connection is terminated with error state.
        """
        ws_url = self._url.replace("https://", "wss://").replace("http://", "ws://")
        cookie = "; ".join(f"{name}={value}" for name, value in self._session.cookies.items())

        ws_url = (
            f"{ws_url}/remote/events/?topics=com/prosyst/mbs/services/fim/FunctionalItemEvent/PROPERTY_CHANGED,"
            f"com/prosyst/mbs/services/fim/FunctionalItemEvent/UNREGISTERED"
            f"&filter=(|(GW_ID={self.gateway.id})(!(GW_ID=*)))"
        )
        self._logger.debug("Connecting to %s", ws_url)
        self._ws = websocket.WebSocketApp(
            ws_url,
            cookie=cookie,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_pong=self._on_pong,
        )
        self._ws.run_forever(ping_interval=30, ping_timeout=5)

    def websocket_disconnect(self, event: str = "") -> None:
        """Close the websocket connection."""
        if not self._ws:
            self._logger.info("Not connected to the web socket.")
            return

        self._logger.info("Closing web socket connection.")
        if event:
            self._logger.info("Reason: %s", event)
        self._ws.close()

    def _on_close(self, *_: Any) -> None:
        """React on closing the websocket."""
        self._logger.info("Closed websocket connection.")

    def _on_error(self, ws: websocket.WebSocketApp, error: Exception) -> None:
        """React on errors. We will try reconnecting with prolonging intervals."""
        self._logger.exception(error)
        self._connected = False
        self._reachable = False
        ws.close()
        self._event_sequence = 0

        sleep_interval = 16
        while not self._reachable:
            self._try_reconnect(sleep_interval)
            sleep_interval = min(sleep_interval * 2, 3600)

        self.websocket_connect()

    def _on_message(self, _: websocket.WebSocketApp, message: str) -> None:
        """React on a message."""
        msg = json.loads(message)
        self._logger.debug("Got message from websocket:\n%s", msg)
        event_sequence = msg["properties"]["com.prosyst.mbs.services.remote.event.sequence.number"]
        if event_sequence == self._event_sequence:
            self._event_sequence += 1
        else:
            self._logger.warning(
                "We missed a websocket message. Internal event_sequence is at %s. Event sequence by websocket is at %s",
                self._event_sequence,
                event_sequence,
            )
            self._event_sequence = event_sequence + 1
            self._logger.debug("self._event_sequence is set to %s", self._event_sequence)

        self.on_update(msg)

    def _on_open(self, ws: websocket.WebSocketApp) -> None:
        """Keep the websocket open."""

        def run() -> None:
            self._logger.info("Starting web socket connection.")
            while ws.sock is not None and ws.sock.connected:
                sleep(1)

        threading.Thread(target=run, name=f"{self.__class__.__name__}.websocket_run").start()
        self._connected = True

    def _on_pong(self, *_: Any) -> None:
        """Keep the session valid."""
        self.refresh_session()

    def _try_reconnect(self, sleep_interval: int) -> None:
        """Try to reconnect to the websocket."""
        try:
            self._logger.info("Trying to reconnect to the websocket.")
            self._reachable = self.get_local_session() if self._local_ip else self.get_remote_session()
        except (ConnectTimeoutError, GatewayOfflineError):
            self._logger.info("Sleeping for %s seconds.", sleep_interval)
            sleep(sleep_interval)
        except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
            self._logger.info("Sleeping for %s seconds.", sleep_interval)
            sleep(sleep_interval - 3)  # mDNS browsing will take up tp 3 seconds by itself
            self.detect_gateway_in_lan()