File: roborock_future.py

package info (click to toggle)
python-roborock 2.38.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,092 kB
  • sloc: python: 9,722; makefile: 17
file content (36 lines) | stat: -rw-r--r-- 1,044 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from __future__ import annotations

from asyncio import Future
from typing import Any

import async_timeout

from .exceptions import VacuumError


class RoborockFuture:
    def __init__(self, protocol: int):
        self.protocol = protocol
        self.fut: Future = Future()
        self.loop = self.fut.get_loop()

    def _set_result(self, item: Any) -> None:
        if not self.fut.cancelled():
            self.fut.set_result(item)

    def set_result(self, item: Any) -> None:
        self.loop.call_soon_threadsafe(self._set_result, item)

    def _set_exception(self, exc: VacuumError) -> None:
        if not self.fut.cancelled():
            self.fut.set_exception(exc)

    def set_exception(self, exc: VacuumError) -> None:
        self.loop.call_soon_threadsafe(self._set_exception, exc)

    async def async_get(self, timeout: float | int) -> tuple[Any, VacuumError | None]:
        try:
            async with async_timeout.timeout(timeout):
                return await self.fut
        finally:
            self.fut.cancel()