1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
"""Use of this source code is governed by the MIT license found in the LICENSE file.
Plugwise Smile communication protocol helpers.
"""
from __future__ import annotations
from plugwise.constants import LOGGER
from plugwise.exceptions import (
ConnectionFailedError,
InvalidAuthentication,
InvalidXMLError,
ResponseError,
)
from plugwise.util import escape_illegal_xml_characters
# This way of importing aiohttp is because of patch/mocking in testing (aiohttp timeouts)
from aiohttp import BasicAuth, ClientError, ClientResponse, ClientSession, ClientTimeout
from defusedxml import ElementTree as etree
class SmileComm:
"""The SmileComm class."""
def __init__(
self,
host: str,
password: str,
port: int,
timeout: int,
username: str,
websession: ClientSession | None,
) -> None:
"""Set the constructor for this class."""
if not websession:
aio_timeout = ClientTimeout(total=timeout)
self._websession = ClientSession(timeout=aio_timeout)
else:
self._websession = websession
# Quickfix IPv6 formatting, not covering
if host.count(":") > 2: # pragma: no cover
host = f"[{host}]"
self._auth = BasicAuth(username, password=password)
self._endpoint = f"http://{host}:{str(port)}" # Sensitive
async def _request(
self,
command: str,
retry: int = 3,
method: str = "get",
data: str | None = None,
) -> etree.Element:
"""Get/put/delete data from a give URL."""
resp: ClientResponse
url = f"{self._endpoint}{command}"
try:
match method:
case "delete":
resp = await self._websession.delete(url, auth=self._auth)
case "get":
# Work-around for Stretchv2, should not hurt the other smiles
headers = {"Accept-Encoding": "gzip"}
resp = await self._websession.get(
url, headers=headers, auth=self._auth
)
case "post":
headers = {"Content-type": "text/xml"}
resp = await self._websession.post(
url,
headers=headers,
data=data,
auth=self._auth,
)
case "put":
headers = {"Content-type": "text/xml"}
resp = await self._websession.put(
url,
headers=headers,
data=data,
auth=self._auth,
)
except (
ClientError
) as exc: # ClientError is an ancestor class of ServerTimeoutError
if retry < 1:
LOGGER.warning(
"Failed sending %s %s to Plugwise Smile, error: %s",
method,
command,
exc,
)
raise ConnectionFailedError from exc
return await self._request(command, retry - 1)
if resp.status == 504:
if retry < 1:
LOGGER.warning(
"Failed sending %s %s to Plugwise Smile, error: %s",
method,
command,
"504 Gateway Timeout",
)
raise ConnectionFailedError
return await self._request(command, retry - 1)
return await self._request_validate(resp, method)
async def _request_validate(
self, resp: ClientResponse, method: str
) -> etree.Element:
"""Helper-function for _request(): validate the returned data."""
match resp.status:
case 200:
# Cornercases for server not responding with 202
if method in ("post", "put"):
return
case 202:
# Command accepted gives empty body with status 202
return
case 401:
msg = (
"Invalid Plugwise login, please retry with the correct credentials."
)
LOGGER.error("%s", msg)
raise InvalidAuthentication
case 405:
msg = "405 Method not allowed."
LOGGER.error("%s", msg)
raise ConnectionFailedError
if not (result := await resp.text()) or (
"<error>" in result and "Not started" not in result
):
LOGGER.warning("Smile response empty or error in %s", result)
raise ResponseError
try:
# Encode to ensure utf8 parsing
xml = etree.XML(escape_illegal_xml_characters(result).encode())
except etree.ParseError as exc:
LOGGER.warning("Smile returns invalid XML for %s", self._endpoint)
raise InvalidXMLError from exc
return xml
async def close_connection(self) -> None:
"""Close the Plugwise connection."""
await self._websession.close()
|