1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
"""Helper for Home Assistant Idasen Desk integration."""
from __future__ import annotations
import asyncio
import logging
from typing import Callable
from bleak.backends.device import BLEDevice
from idasen import IdasenDesk
from .connection_manager import ConnectionManager
_LOGGER = logging.getLogger(__name__)
class Desk:
"""Wrapper around the IdasenDesk from the idasen library."""
def __init__(
self,
update_callback: Callable[[int | None], None] | None,
monitor_height: bool = True,
) -> None:
"""Initialize the wrapper."""
self._ble_device: BLEDevice | None = None
self._connection_manager: ConnectionManager | None = None
self._height: float | None = None
self._monitor_height: bool = monitor_height
if update_callback:
self._update_callback = update_callback
else:
def empty_update_callback(height: int | None) -> None:
pass
self._update_callback = empty_update_callback
async def connect(self, ble_device: BLEDevice, retry: bool = True) -> None:
"""Perform the bluetooth connection to the desk."""
_LOGGER.debug("Connecting")
if (
self._connection_manager is None
or self._ble_device is None
or ble_device.address != self._ble_device.address
):
_LOGGER.debug("Initializing idasen desk")
await self.disconnect()
self._ble_device = ble_device
self._connection_manager = self._create_connection_manager(self._ble_device)
await self._connection_manager.connect(retry)
async def disconnect(self) -> None:
"""Disconnect from the desk."""
if self._connection_manager:
_LOGGER.debug("Disconnecting")
await self._connection_manager.disconnect()
async def move_to(self, heigh_percent: int) -> None:
"""Move the desk to a specific position."""
_LOGGER.debug("Moving to %s", heigh_percent)
if not self.is_connected or self._idasen_desk is None:
_LOGGER.warning("Not connected")
return
if self._idasen_desk.is_moving:
await self._idasen_desk.stop()
# Let it settle before requesting a new move
await asyncio.sleep(0.5)
height = IdasenDesk.MIN_HEIGHT + (
IdasenDesk.MAX_HEIGHT - IdasenDesk.MIN_HEIGHT
) * (heigh_percent / 100)
await self._idasen_desk.move_to_target(height)
async def move_up(self) -> None:
"""Move the desk up."""
_LOGGER.debug("Moving up")
await self.move_to(100)
async def move_down(self) -> None:
"""Move the desk down."""
_LOGGER.debug("Moving down")
await self.move_to(0)
async def stop(self) -> None:
"""Stop moving the desk."""
_LOGGER.debug("Stopping")
if not self.is_connected or self._idasen_desk is None:
_LOGGER.warning("Not connected")
return
await self._idasen_desk.stop()
async def _start_monitoring(self) -> None:
"""Start monitoring for height changes."""
if not self.is_connected or self._idasen_desk is None:
_LOGGER.warning("Not connected")
return
async def update_height(height: float) -> None:
self._height = height
self._update_callback(self.height_percent)
await self._idasen_desk.monitor(update_height)
@property
def height(self) -> float | None:
"""The current height in meters."""
return self._height
@property
def height_percent(self) -> int | None:
"""The current height in percentage."""
if self._height is None:
return None
return int(
round(
100
* (self._height - IdasenDesk.MIN_HEIGHT)
/ (IdasenDesk.MAX_HEIGHT - IdasenDesk.MIN_HEIGHT)
)
)
@property
def is_connected(self) -> bool:
"""True if the bluetooth connection is currently established."""
if self._idasen_desk is None:
return False
# bleak `is_connected` method returns a `_DeprecatedIsConnectedReturn`,
# so we properly cast it to bool otherwise `is_connected == True`
# will always be False.
return bool(self._idasen_desk.is_connected)
@property
def _idasen_desk(self) -> IdasenDesk | None:
if self._connection_manager is None:
return None
return self._connection_manager.idasen_desk
def _create_connection_manager(self, ble_device: BLEDevice) -> ConnectionManager:
async def connect_callback() -> None:
_LOGGER.debug("Connect callback called")
if self._idasen_desk is None:
_LOGGER.error("Desk is None after connecting")
return
if self._monitor_height:
self._height = await self._idasen_desk.get_height()
await self._start_monitoring()
self._update_callback(self.height_percent)
return ConnectionManager(
ble_device,
connect_callback=connect_callback,
disconnect_callback=lambda: self._update_callback(self.height_percent),
)
|