File: client.py

package info (click to toggle)
python-go2rtc-client 0.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: python: 945; makefile: 3
file content (146 lines) | stat: -rw-r--r-- 4,745 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Websocket client for go2rtc server."""

import asyncio
from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType

from go2rtc_client.exceptions import handle_error

from .messages import BaseMessage, ReceiveMessages, SendMessages, WebRTC, WsMessage

_LOGGER = logging.getLogger(__name__)


class Go2RtcWsClient:
    """Websocket client for go2rtc server."""

    def __init__(
        self,
        session: ClientSession,
        server_url: str,
        *,
        source: str | None = None,
        destination: str | None = None,
    ) -> None:
        """Initialize Client."""
        if source:
            if destination:
                msg = "Source and destination cannot be set at the same time"
                raise ValueError(msg)
            params = {"src": source}
        elif destination:
            params = {"dst": destination}
        else:
            msg = "Source or destination must be set"
            raise ValueError(msg)

        self._server_url = server_url
        self._session = session
        self._params = params
        self._client: ClientWebSocketResponse | None = None
        self._rx_task: asyncio.Task[None] | None = None
        self._subscribers: list[Callable[[ReceiveMessages], None]] = []
        self._connect_lock = asyncio.Lock()

    @property
    def connected(self) -> bool:
        """Return if we're currently connected."""
        return self._client is not None and not self._client.closed

    @handle_error
    async def connect(self) -> None:
        """Connect to device."""
        async with self._connect_lock:
            if self.connected:
                return

            _LOGGER.debug("Trying to connect to %s", self._server_url)
            self._client = await self._session.ws_connect(
                urljoin(self._server_url, "/api/ws"), params=self._params
            )

            self._rx_task = asyncio.create_task(self._receive_messages())
            _LOGGER.info("Connected to %s", self._server_url)

    @handle_error
    async def close(self) -> None:
        """Close connection."""
        if self.connected:
            if TYPE_CHECKING:
                assert self._client is not None
            client = self._client
            self._client = None
            await client.close()

        if self._rx_task:
            task = self._rx_task
            self._rx_task = None
            task.cancel()
            await task

    @handle_error
    async def send(self, message: SendMessages) -> None:
        """Send a message."""
        if not self.connected:
            await self.connect()

        if TYPE_CHECKING:
            assert self._client is not None

        await self._client.send_str(message.to_json())

    def _process_text_message(self, data: Any) -> None:
        """Process text message."""
        try:
            message: WsMessage = BaseMessage.from_json(data)
        except Exception:  # pylint: disable=broad-except
            _LOGGER.exception("Invalid message received: %s", data)
        else:
            if isinstance(message, WebRTC):
                message = message.value
            if not isinstance(message, ReceiveMessages):
                _LOGGER.error("Received unexpected message: %s", message)
                return
            for subscriber in self._subscribers:
                try:
                    subscriber(message)
                except Exception:  # pylint: disable=broad-except
                    _LOGGER.exception("Error on subscriber callback")

    async def _receive_messages(self) -> None:
        """Receive messages."""
        if TYPE_CHECKING:
            assert self._client

        while self.connected:
            msg = await self._client.receive()
            match msg.type:
                case (
                    WSMsgType.CLOSE
                    | WSMsgType.CLOSED
                    | WSMsgType.CLOSING
                    | WSMsgType.PING
                    | WSMsgType.PONG
                ):
                    break
                case WSMsgType.ERROR:
                    _LOGGER.error("Error received: %s", msg.data)
                case WSMsgType.TEXT:
                    self._process_text_message(msg.data)
                case _:
                    _LOGGER.warning("Received unknown message: %s", msg)

    def subscribe(
        self, callback: Callable[[ReceiveMessages], None]
    ) -> Callable[[], None]:
        """Subscribe to messages."""

        def _unsubscribe() -> None:
            self._subscribers.remove(callback)

        self._subscribers.append(callback)
        return _unsubscribe