1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
"""Code for communication with the Livisi application websocket."""
from typing import Callable
import urllib.parse
import websockets
from pydantic import ValidationError
from aiolivisi.livisi_event import LivisiEvent
from .aiolivisi import AioLivisi
from .const import (
AVATAR_PORT,
IS_REACHABLE,
ON_STATE,
VALUE,
IS_OPEN,
SET_POINT_TEMPERATURE,
POINT_TEMPERATURE,
HUMIDITY,
TEMPERATURE,
LUMINANCE,
KEY_INDEX,
KEY_PRESS_LONG,
KEY_PRESS_TYPE,
EVENT_BUTTON_PRESSED,
EVENT_STATE_CHANGED,
)
class Websocket:
"""Represents the websocket class."""
def __init__(self, aiolivisi: AioLivisi) -> None:
"""Initialize the websocket."""
self.aiolivisi = aiolivisi
self.connection_url: str = None
async def connect(self, on_data, on_close, port: int) -> None:
"""Connect to the socket."""
if port == AVATAR_PORT:
token = urllib.parse.quote(self.aiolivisi.token)
else:
token = self.aiolivisi.token
ip_address = self.aiolivisi.livisi_connection_data["ip_address"]
self.connection_url = f"ws://{ip_address}:{port}/events?token={token}"
try:
async with websockets.connect(
self.connection_url, ping_interval=10, ping_timeout=10
) as websocket:
try:
self._websocket = websocket
await self.consumer_handler(websocket, on_data)
except Exception:
await on_close()
return
except Exception:
await on_close()
return
async def disconnect(self) -> None:
"""Close the websocket."""
await self._websocket.close(code=1000, reason="Handle disconnect request")
async def consumer_handler(self, websocket, on_data: Callable):
"""Used when data is transmited using the websocket."""
async for message in websocket:
try:
event_data = LivisiEvent.parse_raw(message)
except ValidationError:
continue
if "device" in event_data.source:
event_data.source = event_data.source.replace("/device/", "")
if event_data.properties is None:
continue
if event_data.type == EVENT_STATE_CHANGED:
if ON_STATE in event_data.properties.keys():
event_data.onState = event_data.properties.get(ON_STATE)
elif VALUE in event_data.properties.keys() and isinstance(
event_data.properties.get(VALUE), bool
):
event_data.onState = event_data.properties.get(VALUE)
if SET_POINT_TEMPERATURE in event_data.properties.keys():
event_data.vrccData = event_data.properties.get(
SET_POINT_TEMPERATURE
)
elif POINT_TEMPERATURE in event_data.properties.keys():
event_data.vrccData = event_data.properties.get(POINT_TEMPERATURE)
elif TEMPERATURE in event_data.properties.keys():
event_data.vrccData = event_data.properties.get(TEMPERATURE)
elif HUMIDITY in event_data.properties.keys():
event_data.vrccData = event_data.properties.get(HUMIDITY)
if LUMINANCE in event_data.properties.keys():
event_data.luminance = event_data.properties.get(LUMINANCE)
if IS_REACHABLE in event_data.properties.keys():
event_data.isReachable = event_data.properties.get(IS_REACHABLE)
if IS_OPEN in event_data.properties.keys():
event_data.isOpen = event_data.properties.get(IS_OPEN)
elif event_data.type == EVENT_BUTTON_PRESSED:
if KEY_INDEX in event_data.properties.keys():
event_data.keyIndex = event_data.properties.get(KEY_INDEX)
event_data.isLongKeyPress = (
KEY_PRESS_LONG == event_data.properties.get(KEY_PRESS_TYPE)
)
on_data(event_data)
|