1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
"""Websocket client for go2rtc server."""
import asyncio
from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType
from go2rtc_client.exceptions import handle_error
from .messages import BaseMessage, ReceiveMessages, SendMessages, WebRTC, WsMessage
_LOGGER = logging.getLogger(__name__)
class Go2RtcWsClient:
"""Websocket client for go2rtc server."""
def __init__(
self,
session: ClientSession,
server_url: str,
*,
source: str | None = None,
destination: str | None = None,
) -> None:
"""Initialize Client."""
if source:
if destination:
msg = "Source and destination cannot be set at the same time"
raise ValueError(msg)
params = {"src": source}
elif destination:
params = {"dst": destination}
else:
msg = "Source or destination must be set"
raise ValueError(msg)
self._server_url = server_url
self._session = session
self._params = params
self._client: ClientWebSocketResponse | None = None
self._rx_task: asyncio.Task[None] | None = None
self._subscribers: list[Callable[[ReceiveMessages], None]] = []
self._connect_lock = asyncio.Lock()
@property
def connected(self) -> bool:
"""Return if we're currently connected."""
return self._client is not None and not self._client.closed
@handle_error
async def connect(self) -> None:
"""Connect to device."""
async with self._connect_lock:
if self.connected:
return
_LOGGER.debug("Trying to connect to %s", self._server_url)
self._client = await self._session.ws_connect(
urljoin(self._server_url, "/api/ws"), params=self._params
)
self._rx_task = asyncio.create_task(self._receive_messages())
_LOGGER.info("Connected to %s", self._server_url)
@handle_error
async def close(self) -> None:
"""Close connection."""
if self.connected:
if TYPE_CHECKING:
assert self._client is not None
client = self._client
self._client = None
await client.close()
if self._rx_task:
task = self._rx_task
self._rx_task = None
task.cancel()
await task
@handle_error
async def send(self, message: SendMessages) -> None:
"""Send a message."""
if not self.connected:
await self.connect()
if TYPE_CHECKING:
assert self._client is not None
await self._client.send_str(message.to_json())
def _process_text_message(self, data: Any) -> None:
"""Process text message."""
try:
message: WsMessage = BaseMessage.from_json(data)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Invalid message received: %s", data)
else:
if isinstance(message, WebRTC):
message = message.value
if not isinstance(message, ReceiveMessages):
_LOGGER.error("Received unexpected message: %s", message)
return
for subscriber in self._subscribers:
try:
subscriber(message)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error on subscriber callback")
async def _receive_messages(self) -> None:
"""Receive messages."""
if TYPE_CHECKING:
assert self._client
while self.connected:
msg = await self._client.receive()
match msg.type:
case (
WSMsgType.CLOSE
| WSMsgType.CLOSED
| WSMsgType.CLOSING
| WSMsgType.PING
| WSMsgType.PONG
):
break
case WSMsgType.ERROR:
_LOGGER.error("Error received: %s", msg.data)
case WSMsgType.TEXT:
self._process_text_message(msg.data)
case _:
_LOGGER.warning("Received unknown message: %s", msg)
def subscribe(
self, callback: Callable[[ReceiveMessages], None]
) -> Callable[[], None]:
"""Subscribe to messages."""
def _unsubscribe() -> None:
self._subscribers.remove(callback)
self._subscribers.append(callback)
return _unsubscribe
|