File: __init__.py

package info (click to toggle)
python-idasen-ha 2.6.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 188 kB
  • sloc: python: 571; sh: 5; makefile: 4
file content (162 lines) | stat: -rw-r--r-- 5,342 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Helper for Home Assistant Idasen Desk integration."""

from __future__ import annotations

import asyncio
import logging
from typing import Callable

from bleak.backends.device import BLEDevice
from idasen import IdasenDesk

from .connection_manager import ConnectionManager

_LOGGER = logging.getLogger(__name__)


class Desk:
    """Wrapper around the IdasenDesk from the idasen library."""

    def __init__(
        self,
        update_callback: Callable[[int | None], None] | None,
        monitor_height: bool = True,
    ) -> None:
        """Initialize the wrapper."""
        self._ble_device: BLEDevice | None = None
        self._connection_manager: ConnectionManager | None = None
        self._height: float | None = None
        self._monitor_height: bool = monitor_height

        if update_callback:
            self._update_callback = update_callback
        else:

            def empty_update_callback(height: int | None) -> None:
                pass

            self._update_callback = empty_update_callback

    async def connect(self, ble_device: BLEDevice, retry: bool = True) -> None:
        """Perform the bluetooth connection to the desk."""
        _LOGGER.debug("Connecting")

        if (
            self._connection_manager is None
            or self._ble_device is None
            or ble_device.address != self._ble_device.address
        ):
            _LOGGER.debug("Initializing idasen desk")
            await self.disconnect()
            self._ble_device = ble_device
            self._connection_manager = self._create_connection_manager(self._ble_device)

        await self._connection_manager.connect(retry)

    async def disconnect(self) -> None:
        """Disconnect from the desk."""
        if self._connection_manager:
            _LOGGER.debug("Disconnecting")
            await self._connection_manager.disconnect()

    async def move_to(self, heigh_percent: int) -> None:
        """Move the desk to a specific position."""
        _LOGGER.debug("Moving to %s", heigh_percent)
        if not self.is_connected or self._idasen_desk is None:
            _LOGGER.warning("Not connected")
            return

        if self._idasen_desk.is_moving:
            await self._idasen_desk.stop()
            # Let it settle before requesting a new move
            await asyncio.sleep(0.5)

        height = IdasenDesk.MIN_HEIGHT + (
            IdasenDesk.MAX_HEIGHT - IdasenDesk.MIN_HEIGHT
        ) * (heigh_percent / 100)

        await self._idasen_desk.move_to_target(height)

    async def move_up(self) -> None:
        """Move the desk up."""
        _LOGGER.debug("Moving up")
        await self.move_to(100)

    async def move_down(self) -> None:
        """Move the desk down."""
        _LOGGER.debug("Moving down")
        await self.move_to(0)

    async def stop(self) -> None:
        """Stop moving the desk."""
        _LOGGER.debug("Stopping")
        if not self.is_connected or self._idasen_desk is None:
            _LOGGER.warning("Not connected")
            return
        await self._idasen_desk.stop()

    async def _start_monitoring(self) -> None:
        """Start monitoring for height changes."""
        if not self.is_connected or self._idasen_desk is None:
            _LOGGER.warning("Not connected")
            return

        async def update_height(height: float) -> None:
            self._height = height
            self._update_callback(self.height_percent)

        await self._idasen_desk.monitor(update_height)

    @property
    def height(self) -> float | None:
        """The current height in meters."""
        return self._height

    @property
    def height_percent(self) -> int | None:
        """The current height in percentage."""
        if self._height is None:
            return None

        return int(
            round(
                100
                * (self._height - IdasenDesk.MIN_HEIGHT)
                / (IdasenDesk.MAX_HEIGHT - IdasenDesk.MIN_HEIGHT)
            )
        )

    @property
    def is_connected(self) -> bool:
        """True if the bluetooth connection is currently established."""
        if self._idasen_desk is None:
            return False
        # bleak `is_connected` method returns a `_DeprecatedIsConnectedReturn`,
        # so we properly cast it to bool otherwise `is_connected == True`
        # will always be False.
        return bool(self._idasen_desk.is_connected)

    @property
    def _idasen_desk(self) -> IdasenDesk | None:
        if self._connection_manager is None:
            return None
        return self._connection_manager.idasen_desk

    def _create_connection_manager(self, ble_device: BLEDevice) -> ConnectionManager:
        async def connect_callback() -> None:
            _LOGGER.debug("Connect callback called")
            if self._idasen_desk is None:
                _LOGGER.error("Desk is None after connecting")
                return

            if self._monitor_height:
                self._height = await self._idasen_desk.get_height()
                await self._start_monitoring()

            self._update_callback(self.height_percent)

        return ConnectionManager(
            ble_device,
            connect_callback=connect_callback,
            disconnect_callback=lambda: self._update_callback(self.height_percent),
        )