File: home.py

package info (click to toggle)
python-roborock 3.8.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,912 kB
  • sloc: python: 14,982; makefile: 17
file content (272 lines) | stat: -rw-r--r-- 12,217 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""Trait that represents a full view of the home layout.

This trait combines information about maps and rooms to provide a comprehensive
view of the home layout, including room names and their corresponding segment
on the map. It also makes it straight forward to fetch the map image and data.

This trait depends on the MapsTrait and RoomsTrait to gather the necessary
information. It provides properties to access the current map, the list of
rooms with names, and the map image and data.

Callers may first call `discover_home()` to populate the home layout cache by
iterating through all available maps on the device. This will cache the map
information and room names for all maps to minimize map switching and improve
performance. After the initial discovery, callers can call `refresh()` to update
the current map's information and room names as needed.
"""

import asyncio
import base64
import logging
from typing import Self

from roborock.data import CombinedMapInfo, RoborockBase
from roborock.data.v1.v1_code_mappings import RoborockStateCode
from roborock.devices.cache import Cache
from roborock.devices.traits.v1 import common
from roborock.exceptions import RoborockDeviceBusy, RoborockException
from roborock.roborock_typing import RoborockCommand

from .map_content import MapContent, MapContentTrait
from .maps import MapsTrait
from .rooms import RoomsTrait
from .status import StatusTrait

_LOGGER = logging.getLogger(__name__)

MAP_SLEEP = 3


class HomeTrait(RoborockBase, common.V1TraitMixin):
    """Trait that represents a full view of the home layout."""

    command = RoborockCommand.GET_MAP_V1  # This is not used

    def __init__(
        self,
        status_trait: StatusTrait,
        maps_trait: MapsTrait,
        map_content: MapContentTrait,
        rooms_trait: RoomsTrait,
        cache: Cache,
    ) -> None:
        """Initialize the HomeTrait.

        We keep track of the MapsTrait and RoomsTrait to provide a comprehensive
        view of the home layout. This also depends on the StatusTrait to determine
        the current map. See comments in MapsTrait for details on that dependency.

        The cache is used to store discovered home data to minimize map switching
        and improve performance. The cache should be persisted by the caller to
        ensure data is retained across restarts.

        After initial discovery, only information for the current map is refreshed
        to keep data up to date without excessive map switching. However, as
        users switch rooms, the current map's data will be updated to ensure
        accuracy.
        """
        super().__init__()
        self._status_trait = status_trait
        self._maps_trait = maps_trait
        self._map_content = map_content
        self._rooms_trait = rooms_trait
        self._cache = cache
        self._discovery_completed = False
        self._home_map_info: dict[int, CombinedMapInfo] | None = None
        self._home_map_content: dict[int, MapContent] | None = None

    async def discover_home(self) -> None:
        """Iterate through all maps to discover rooms and cache them.

        This will be a no-op if the home cache is already populated.

        This cannot be called while the device is cleaning, as that would interrupt the
        cleaning process. This will raise `RoborockDeviceBusy` if the device is
        currently cleaning.

        After discovery, the home cache will be populated and can be accessed via the `home_map_info` property.
        """
        cache_data = await self._cache.get()
        if cache_data.home_map_info and (cache_data.home_map_content or cache_data.home_map_content_base64):
            _LOGGER.debug("Home cache already populated, skipping discovery")
            self._home_map_info = cache_data.home_map_info
            self._discovery_completed = True
            try:
                if cache_data.home_map_content_base64:
                    self._home_map_content = {
                        k: self._map_content.parse_map_content(base64.b64decode(v))
                        for k, v in cache_data.home_map_content_base64.items()
                    }
                else:
                    self._home_map_content = {
                        k: self._map_content.parse_map_content(v) for k, v in cache_data.home_map_content.items()
                    }
            except (ValueError, RoborockException) as ex:
                _LOGGER.warning("Failed to parse cached home map content, will re-discover: %s", ex)
                self._home_map_content = {}
            else:
                return

        if self._status_trait.state == RoborockStateCode.cleaning:
            raise RoborockDeviceBusy("Cannot perform home discovery while the device is cleaning")

        await self._maps_trait.refresh()
        if self._maps_trait.current_map_info is None:
            raise RoborockException("Cannot perform home discovery without current map info")

        home_map_info, home_map_content = await self._build_home_map_info()
        _LOGGER.debug("Home discovery complete, caching data for %d maps", len(home_map_info))
        self._discovery_completed = True
        await self._update_home_cache(home_map_info, home_map_content)

    async def _refresh_map_info(self, map_info) -> CombinedMapInfo:
        """Collect room data for a specific map and return CombinedMapInfo."""
        await self._rooms_trait.refresh()
        return CombinedMapInfo(
            map_flag=map_info.map_flag,
            name=map_info.name,
            rooms=self._rooms_trait.rooms or [],
        )

    async def _refresh_map_content(self) -> MapContent:
        """Refresh the map content trait to get the latest map data."""
        await self._map_content.refresh()
        return MapContent(
            image_content=self._map_content.image_content,
            map_data=self._map_content.map_data,
            raw_api_response=self._map_content.raw_api_response,
        )

    async def _build_home_map_info(self) -> tuple[dict[int, CombinedMapInfo], dict[int, MapContent]]:
        """Perform the actual discovery and caching of home map info and content."""
        home_map_info: dict[int, CombinedMapInfo] = {}
        home_map_content: dict[int, MapContent] = {}

        # Sort map_info to process the current map last, reducing map switching.
        # False (non-original maps) sorts before True (original map). We ensure
        # we load the original map last.
        sorted_map_infos = sorted(
            self._maps_trait.map_info or [],
            key=lambda mi: mi.map_flag == self._maps_trait.current_map,
            reverse=False,
        )
        _LOGGER.debug("Building home cache for maps: %s", [mi.map_flag for mi in sorted_map_infos])
        for map_info in sorted_map_infos:
            # We need to load each map to get its room data
            if len(sorted_map_infos) > 1:
                _LOGGER.debug("Loading map %s", map_info.map_flag)
                await self._maps_trait.set_current_map(map_info.map_flag)
                await asyncio.sleep(MAP_SLEEP)

            map_content = await self._refresh_map_content()
            home_map_content[map_info.map_flag] = map_content

            combined_map_info = await self._refresh_map_info(map_info)
            home_map_info[map_info.map_flag] = combined_map_info
        return home_map_info, home_map_content

    async def refresh(self) -> None:
        """Refresh current map's underlying map and room data, updating cache as needed.

        This will only refresh the current map's data and will not populate non
        active maps or re-discover the home. It is expected that this will keep
        information up to date for the current map as users switch to that map.
        """
        if not self._discovery_completed:
            # Running initial discovery also populates all of the same information
            # as below so we can just call that method. If the device is busy
            # then we'll fall through below to refresh the current map only.
            try:
                await self.discover_home()
                return
            except RoborockDeviceBusy:
                _LOGGER.debug("Cannot refresh home data while device is busy cleaning")

        # Refresh the list of map names/info
        await self._maps_trait.refresh()
        if (current_map_info := self._maps_trait.current_map_info) is None or (
            map_flag := self._maps_trait.current_map
        ) is None:
            raise RoborockException("Cannot refresh home data without current map info")

        # Refresh the map content to ensure we have the latest image and object positions
        new_map_content = await self._refresh_map_content()
        # Refresh the current map's room data
        combined_map_info = await self._refresh_map_info(current_map_info)
        await self._update_current_map(
            map_flag, combined_map_info, new_map_content, update_cache=self._discovery_completed
        )

    @property
    def home_map_info(self) -> dict[int, CombinedMapInfo] | None:
        """Returns the map information for all cached maps."""
        return self._home_map_info

    @property
    def current_map_data(self) -> CombinedMapInfo | None:
        """Returns the map data for the current map."""
        current_map_flag = self._maps_trait.current_map
        if current_map_flag is None or self._home_map_info is None:
            return None
        return self._home_map_info.get(current_map_flag)

    @property
    def home_map_content(self) -> dict[int, MapContent] | None:
        """Returns the map content for all cached maps."""
        return self._home_map_content

    def _parse_response(self, response: common.V1ResponseData) -> Self:
        """This trait does not parse responses directly."""
        raise NotImplementedError("HomeTrait does not support direct command responses")

    async def _update_home_cache(
        self, home_map_info: dict[int, CombinedMapInfo], home_map_content: dict[int, MapContent]
    ) -> None:
        """Update the entire home cache with new map info and content."""
        cache_data = await self._cache.get()
        cache_data.home_map_info = home_map_info
        cache_data.home_map_content_base64 = {
            k: base64.b64encode(v.raw_api_response).decode("utf-8")
            for k, v in home_map_content.items()
            if v.raw_api_response
        }
        cache_data.home_map_content = {}
        await self._cache.set(cache_data)
        self._home_map_info = home_map_info
        self._home_map_content = home_map_content

    async def _update_current_map(
        self,
        map_flag: int,
        map_info: CombinedMapInfo,
        map_content: MapContent,
        update_cache: bool,
    ) -> None:
        """Update the cache for the current map only."""
        # Update the persistent cache if requested e.g. home discovery has
        # completed and we want to keep it fresh. Otherwise just update the
        # in memory map below.
        if update_cache:
            cache_data = await self._cache.get()
            cache_data.home_map_info[map_flag] = map_info
            # Migrate existing cached content to base64 if needed
            if cache_data.home_map_content and not cache_data.home_map_content_base64:
                cache_data.home_map_content_base64 = {
                    k: base64.b64encode(v).decode("utf-8") for k, v in cache_data.home_map_content.items()
                }
                cache_data.home_map_content = {}
            if map_content.raw_api_response:
                if cache_data.home_map_content_base64 is None:
                    cache_data.home_map_content_base64 = {}
                cache_data.home_map_content_base64[map_flag] = base64.b64encode(map_content.raw_api_response).decode(
                    "utf-8"
                )
            await self._cache.set(cache_data)

        if self._home_map_info is None:
            self._home_map_info = {}
        self._home_map_info[map_flag] = map_info

        if self._home_map_content is None:
            self._home_map_content = {}
        self._home_map_content[map_flag] = map_content