1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
"""Handler for Sonos websockets."""
import asyncio
import logging
import sys
from typing import Any, cast
import aiohttp
from aiohttp import WSMsgType
from .const import API_KEY, MAX_ATTEMPTS
from .exception import (
SonosWebsocketError,
SonosWSConnectionError,
Unauthorized,
Unsupported,
)
if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout
else:
from asyncio import timeout as asyncio_timeout
_LOGGER = logging.getLogger(__name__)
class SonosWebsocket:
"""Sonos websocket handler."""
def __init__(
self,
ip_addr: str,
player_id: str | None = None,
household_id: str | None = None,
session: aiohttp.ClientSession | None = None,
) -> None:
"""Initialize the websocket instance."""
self.uri = f"wss://{ip_addr}:1443/websocket/api"
self._own_session = not session
self.session = session or aiohttp.ClientSession()
self.ws: aiohttp.ClientWebSocketResponse | None = None
self._household_id = household_id
self._player_id = player_id
self._connect_lock = asyncio.Lock()
async def connect(self) -> None:
"""Open a persistent websocket connection and act on events."""
async with self._connect_lock:
if self.ws and not self.ws.closed:
_LOGGER.warning("Websocket is already connected")
return
_LOGGER.debug("Opening websocket to %s", self.uri)
headers = {
"X-Sonos-Api-Key": API_KEY,
"Sec-WebSocket-Protocol": "v1.api.smartspeaker.audio",
}
try:
async with asyncio_timeout(3):
self.ws = await self.session.ws_connect(
self.uri, headers=headers, verify_ssl=False
)
except aiohttp.ClientResponseError as exc:
if exc.code == 401:
_LOGGER.error("Credentials rejected: %s", exc)
raise Unauthorized("Credentials rejected") from exc
raise SonosWSConnectionError(
f"Unexpected response received: {exc}"
) from exc
except aiohttp.ClientConnectionError as exc:
raise SonosWSConnectionError(f"Connection error: {exc}") from exc
except asyncio.TimeoutError as exc:
raise SonosWSConnectionError("Connection timed out") from exc
except Exception as exc: # pylint: disable=broad-except
raise SonosWSConnectionError(f"Unknown error: {exc}") from exc
_LOGGER.debug("Successfully connected to %s", self.uri)
async def close(self):
"""Close the websocket connection."""
if self.ws and not self.ws.closed:
await self.ws.close()
if self._own_session and self.session and not self.session.closed:
await self.session.close()
async def send_command(
self, command: dict[str, Any], options: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
"""Send commands over the websocket and handle their responses."""
attempt = 1
while attempt <= MAX_ATTEMPTS:
if not self.ws or self.ws.closed:
await self.connect()
assert self.ws
payload = [command, options or {}]
_LOGGER.debug("Sending command: %s", payload)
try:
async with asyncio_timeout(3):
await self.ws.send_json(payload)
msg = await self.ws.receive()
except asyncio.TimeoutError:
_LOGGER.error("Command timed out")
except ConnectionResetError:
# Websocket closing
self.ws = None
_LOGGER.debug("Websocket connection reset, will try again")
else:
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING):
_LOGGER.debug("Websocket closed, will try again")
elif msg.type != WSMsgType.TEXT:
_LOGGER.error("Received non-text message: %s", msg.type.name)
else:
return msg.json()
attempt += 1
command_name = command.get("command", "Empty")
raise SonosWebsocketError(
f"{command_name} command failed after {MAX_ATTEMPTS} attempts"
)
async def play_clip(
self, uri: str, volume: int | None = None
) -> list[dict[str, Any]]:
"""Play an audio clip."""
command = {
"namespace": "audioClip:1",
"command": "loadAudioClip",
"playerId": await self.get_player_id(),
}
options: dict[str, Any] = {
"name": "Sonos Websocket",
"appId": "com.jjlawren.sonos_websocket",
"streamUrl": uri,
}
if volume:
options["volume"] = volume
return await self.send_command(command, options)
async def get_household_id(self) -> str:
"""Get the household ID of this device.
Note: This is an invalid command but returns the household ID anyway.
"""
if self._household_id:
return self._household_id
response, _ = await self.send_command({})
if household_id := response.get("householdId"):
self._household_id = household_id
return household_id
raise SonosWebsocketError("Could not determine household ID")
async def get_groups(self) -> list[dict[str, Any]]:
"""Return the current group and player configuration."""
command = {
"namespace": "groups:1",
"command": "getGroups",
"householdId": await self.get_household_id(),
}
return await self.send_command(command)
async def get_player_id(self) -> str:
"""Retrieve the player identifier for this speaker."""
if self._player_id:
return self._player_id
response, data = await self.get_groups()
if not response["success"]:
raise SonosWebsocketError(f"Retrieving group data failed: {data}")
if player := next(
(p for p in data["players"] if p["websocketUrl"] == self.uri), None
):
if "AUDIO_CLIP" not in player["capabilities"]:
raise Unsupported("Device does not support AUDIO_CLIP")
self._player_id = cast(str, player["id"])
return self._player_id
raise SonosWebsocketError("No matching player found in group data")
|