File: websocket.py

package info (click to toggle)
sonos-websocket 0.1.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 112 kB
  • sloc: python: 182; makefile: 7
file content (175 lines) | stat: -rw-r--r-- 6,552 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Handler for Sonos websockets."""
import asyncio
import logging
import sys
from typing import Any, cast

import aiohttp
from aiohttp import WSMsgType

from .const import API_KEY, MAX_ATTEMPTS
from .exception import (
    SonosWebsocketError,
    SonosWSConnectionError,
    Unauthorized,
    Unsupported,
)

if sys.version_info[:2] < (3, 11):
    from async_timeout import timeout as asyncio_timeout
else:
    from asyncio import timeout as asyncio_timeout


_LOGGER = logging.getLogger(__name__)


class SonosWebsocket:
    """Sonos websocket handler."""

    def __init__(
        self,
        ip_addr: str,
        player_id: str | None = None,
        household_id: str | None = None,
        session: aiohttp.ClientSession | None = None,
    ) -> None:
        """Initialize the websocket instance."""
        self.uri = f"wss://{ip_addr}:1443/websocket/api"
        self._own_session = not session
        self.session = session or aiohttp.ClientSession()
        self.ws: aiohttp.ClientWebSocketResponse | None = None
        self._household_id = household_id
        self._player_id = player_id
        self._connect_lock = asyncio.Lock()

    async def connect(self) -> None:
        """Open a persistent websocket connection and act on events."""
        async with self._connect_lock:
            if self.ws and not self.ws.closed:
                _LOGGER.warning("Websocket is already connected")
                return

        _LOGGER.debug("Opening websocket to %s", self.uri)
        headers = {
            "X-Sonos-Api-Key": API_KEY,
            "Sec-WebSocket-Protocol": "v1.api.smartspeaker.audio",
        }
        try:
            async with asyncio_timeout(3):
                self.ws = await self.session.ws_connect(
                    self.uri, headers=headers, verify_ssl=False
                )
        except aiohttp.ClientResponseError as exc:
            if exc.code == 401:
                _LOGGER.error("Credentials rejected: %s", exc)
                raise Unauthorized("Credentials rejected") from exc
            raise SonosWSConnectionError(
                f"Unexpected response received: {exc}"
            ) from exc
        except aiohttp.ClientConnectionError as exc:
            raise SonosWSConnectionError(f"Connection error: {exc}") from exc
        except asyncio.TimeoutError as exc:
            raise SonosWSConnectionError("Connection timed out") from exc
        except Exception as exc:  # pylint: disable=broad-except
            raise SonosWSConnectionError(f"Unknown error: {exc}") from exc
        _LOGGER.debug("Successfully connected to %s", self.uri)

    async def close(self):
        """Close the websocket connection."""
        if self.ws and not self.ws.closed:
            await self.ws.close()
        if self._own_session and self.session and not self.session.closed:
            await self.session.close()

    async def send_command(
        self, command: dict[str, Any], options: dict[str, Any] | None = None
    ) -> list[dict[str, Any]]:
        """Send commands over the websocket and handle their responses."""
        attempt = 1
        while attempt <= MAX_ATTEMPTS:
            if not self.ws or self.ws.closed:
                await self.connect()
                assert self.ws

            payload = [command, options or {}]
            _LOGGER.debug("Sending command: %s", payload)
            try:
                async with asyncio_timeout(3):
                    await self.ws.send_json(payload)
                    msg = await self.ws.receive()
            except asyncio.TimeoutError:
                _LOGGER.error("Command timed out")
            except ConnectionResetError:
                # Websocket closing
                self.ws = None
                _LOGGER.debug("Websocket connection reset, will try again")
            else:
                if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
                    _LOGGER.debug("Websocket closed, will try again")
                elif msg.type != WSMsgType.TEXT:
                    _LOGGER.error("Received non-text message: %s", msg.type.name)
                else:
                    return msg.json()
            attempt += 1

        command_name = command.get("command", "Empty")
        raise SonosWebsocketError(
            f"{command_name} command failed after {MAX_ATTEMPTS} attempts"
        )

    async def play_clip(
        self, uri: str, volume: int | None = None
    ) -> list[dict[str, Any]]:
        """Play an audio clip."""
        command = {
            "namespace": "audioClip:1",
            "command": "loadAudioClip",
            "playerId": await self.get_player_id(),
        }
        options: dict[str, Any] = {
            "name": "Sonos Websocket",
            "appId": "com.jjlawren.sonos_websocket",
            "streamUrl": uri,
        }
        if volume:
            options["volume"] = volume
        return await self.send_command(command, options)

    async def get_household_id(self) -> str:
        """Get the household ID of this device.

        Note: This is an invalid command but returns the household ID anyway.
        """
        if self._household_id:
            return self._household_id
        response, _ = await self.send_command({})
        if household_id := response.get("householdId"):
            self._household_id = household_id
            return household_id
        raise SonosWebsocketError("Could not determine household ID")

    async def get_groups(self) -> list[dict[str, Any]]:
        """Return the current group and player configuration."""
        command = {
            "namespace": "groups:1",
            "command": "getGroups",
            "householdId": await self.get_household_id(),
        }
        return await self.send_command(command)

    async def get_player_id(self) -> str:
        """Retrieve the player identifier for this speaker."""
        if self._player_id:
            return self._player_id
        response, data = await self.get_groups()
        if not response["success"]:
            raise SonosWebsocketError(f"Retrieving group data failed: {data}")
        if player := next(
            (p for p in data["players"] if p["websocketUrl"] == self.uri), None
        ):
            if "AUDIO_CLIP" not in player["capabilities"]:
                raise Unsupported("Device does not support AUDIO_CLIP")
            self._player_id = cast(str, player["id"])
            return self._player_id
        raise SonosWebsocketError("No matching player found in group data")