File: device_service.py

package info (click to toggle)
python-boschshcpy 0.2.92-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 268 kB
  • sloc: python: 3,343; makefile: 4; sh: 4
file content (94 lines) | stat: -rw-r--r-- 2,998 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from datetime import datetime, timedelta

from .api import SHCAPI


class SHCDeviceService:
    def __init__(self, api: SHCAPI, raw_device_service):
        self._api = api
        self._raw_device_service = raw_device_service
        self._raw_state = (
            self._raw_device_service["state"]
            if "state" in self._raw_device_service
            else {}
        )
        self._last_update = None

        self._callbacks = {}
        self._event_callbacks = {}

    @property
    def id(self):
        return self._raw_device_service["id"]

    @property
    def device_id(self):
        return self._raw_device_service["deviceId"]

    @property
    def state(self):
        return self._raw_state

    @property
    def path(self):
        return self._raw_device_service["path"]

    def subscribe_callback(self, entity, callback):
        self._callbacks[entity] = callback

    def unsubscribe_callback(self, entity):
        self._callbacks.pop(entity, None)

    def register_event(self, event, callback):
        self._event_callbacks[event] = callback

    def summary(self):
        print(f"  Device Service: {self.id}")
        print(f"    State: {self.state}")
        print(f"    Path:  {self.path}")

    def put_state(self, key_value_pairs):
        self._api.put_device_service_state(
            self.device_id.replace("#", "%23"),
            self.id,
            {"@type": self.state["@type"], **key_value_pairs},
        )

    def put_state_element(self, key, value):
        self.put_state({key: value})

    def short_poll(self):
        if self._last_update is None or (
            datetime.utcnow() - self._last_update
        ) > timedelta(seconds=1):
            self._raw_device_service = self._api.get_device_service(
                self.device_id, self.id
            )
            self._last_update = datetime.utcnow()
            self._raw_state = (
                self._raw_device_service["state"]
                if "state" in self._raw_device_service
                else {}
            )

    def process_long_polling_poll_result(self, raw_result):
        assert raw_result["@type"] == "DeviceServiceData"
        self._raw_device_service = raw_result  # Update device service data

        if "state" in self._raw_device_service:
            if self.state:
                assert raw_result["state"]["@type"] == self.state["@type"]
            self._raw_state = raw_result["state"]  # Update state

            for callback in self._callbacks:
                self._callbacks[callback]()

            self._process_events(raw_result)

    def _process_events(self, raw_result):
        if raw_result["id"] == "Keypad":
            if raw_result["state"]["keyName"] in self._event_callbacks:
                self._event_callbacks[raw_result["state"]["keyName"]]()
        if raw_result["id"] == "LatestMotion":
            if raw_result["deviceId"] in self._event_callbacks:
                self._event_callbacks[raw_result["deviceId"]]()