File: location.py

package info (click to toggle)
python-pyaarlo 0.8.0.15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 556 kB
  • sloc: python: 6,064; makefile: 6; sh: 1
file content (191 lines) | stat: -rw-r--r-- 6,190 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from .constant import (
    MODE_ID_TO_NAME_KEY,
    MODE_KEY,
    MODE_NAME_TO_ID_KEY,
    LOCATION_MODES_PATH_FORMAT,
    LOCATION_ACTIVEMODE_PATH_FORMAT,
    MODE_REVISION_KEY
)
from .super import ArloSuper


AUTOMATION_ACTIVE_MODE = "automation/activeMode"
AUTOMATION_MODES = "automation/modes"
DEFAULT_MODES = {
    "standby": "Stand By",
    "armAway": "Armed Away",
    "armHome": "Armed Home"
}


def location_name(name, user):
    if user:
        return f"user_location_{name}"
    return f"location_{name}"


class ArloLocation(ArloSuper):
    """ Represents a Location object.

    Each Arlo account can have multiple owned locations and multiple shared locations.
    """
    def __init__(self, arlo, attrs, user=False):
        super().__init__(location_name(attrs.get("locationName", "unknown"), user),
                         arlo, attrs,
                         id=attrs.get("locationId", "unknown"),
                         type="location")

        self._device_ids = attrs.get("gatewayDeviceIds", [])

    def _id_to_name(self, mode_id):
        return self._load([MODE_ID_TO_NAME_KEY, mode_id], None)

    def _name_to_id(self, mode_name):
        return self._load([MODE_NAME_TO_ID_KEY, mode_name], None)

    def _extra_headers(self):
        return {
            "x-forwarded-user": self._arlo.be.user_id,
            "x-user-device-id": self._arlo.be.user_id,
        }

    def _parse_modes(self, modes):
        for mode in modes.items():
            mode_id = mode[0]
            mode_name = mode[1].get("name", "")
            if mode_id and mode_name != "":
                self.debug(mode_id + "<=M=>" + mode_name)
                self._save([MODE_ID_TO_NAME_KEY, mode_id], mode_name)
                self._save([MODE_NAME_TO_ID_KEY, mode_name], mode_id)

    def _event_handler(self, resource, event):
        self.debug(self.name + " LOCATION got " + resource)

        # A (user requested?) mode change.
        if resource == AUTOMATION_ACTIVE_MODE:
            props = event.get("properties", {})
            mode = props.get("properties", {}).get("mode", None)
            if mode is not None:
                self._save_and_do_callbacks(MODE_KEY, mode)
            mode_revision = props.get("revision", None)
            if mode_revision is not None:
                self._save(MODE_REVISION_KEY, mode_revision)

        # A mode list update
        if resource == AUTOMATION_MODES:
            self._parse_modes(event.get("properties", {}).get("properties", {}))

        # A (user requested?) mode change.
        if resource == "states":
            mode = event.get("states", {}).get("activeMode", None)
            if mode is not None:
                self._save_and_do_callbacks(MODE_KEY, mode)

    @property
    def available_modes(self):
        """Returns string list of available modes.

        For example:: ``['disarmed', 'armed', 'home']``
        """
        return list(self.available_modes_with_ids.keys())

    @property
    def available_modes_with_ids(self):
        """Returns dictionary of available modes mapped to Arlo ids.

        For example:: ``{'armed': 'mode1','disarmed': 'mode0','home': 'mode2'}``
        """
        modes = {}
        for key, mode_id in self._load_matching([MODE_NAME_TO_ID_KEY, "*"]):
            modes[key.split("/")[-1]] = mode_id
        if not modes:
            modes = DEFAULT_MODES
        return modes

    @property
    def device_ids(self):
        return self._device_ids

    @property
    def mode(self):
        """Returns the current mode."""
        return self._load(MODE_KEY, "unknown")

    @mode.setter
    def mode(self, id_or_name):
        """Set the location mode.

        :param id_or_name: mode to use, as returned by available_modes:
        """
        # Convert to an ID.
        mode_id = self._name_to_id(id_or_name)
        if mode_id is None:
            mode_id = id_or_name
        if mode_id is None:
            self._arlo.error("passed invalid id or name {id_or_name}")
            return

        # Need to change?
        if self.mode == mode_id:
            self.debug("no mode change needed")
            return

        # Post change.
        self.debug(f"new-mode={mode_id}({id_or_name})")
        mode_revision = self._load(MODE_REVISION_KEY, 1)
        self.vdebug(f"old-revision={mode_revision}")

        data = self._arlo.be.put(
            LOCATION_ACTIVEMODE_PATH_FORMAT.format(self._id) + f"&revision={mode_revision}",
            params={"mode": mode_id},
            headers=self._extra_headers())

        mode_revision = data.get("revision")
        self.vdebug(f"new-revision={mode_revision}")

        self._save_and_do_callbacks(MODE_KEY, mode_id)
        self._save(MODE_REVISION_KEY, mode_revision)

    @property
    def mode_name(self):
        """Returns the current mode using the Arlo friendly name."""
        return self._id_to_name(self._load(MODE_KEY, "standby"))

    def update_mode(self):
        """Check and update the base's current mode."""
        data = self._arlo.be.get(LOCATION_ACTIVEMODE_PATH_FORMAT.format(self._id),
                                 headers=self._extra_headers())
        mode_id = data.get("properties", {}).get('mode')
        mode_revision = data.get("revision")
        self._save_and_do_callbacks(MODE_KEY, mode_id)
        self._save(MODE_REVISION_KEY, mode_revision)

    def update_modes(self, _initial=False):
        """Get and update the available modes for the base."""
        modes = self._arlo.be.get(LOCATION_MODES_PATH_FORMAT.format(self._id),
                                  headers=self._extra_headers())
        if modes is not None:
            self._parse_modes(modes.get("properties", {}))
        else:
            self._arlo.error("failed to read modes.")

    def stand_by(self):
        self.mode = "standby"

    @property
    def is_stand_by(self):
        return self.mode == "standby"

    def arm_home(self):
        self.mode = "armHome"

    @property
    def is_armed_home(self):
        return self.mode == "armHome"

    def arm_away(self):
        self.mode = "armAway"

    @property
    def is_armed_away(self):
        return self.mode == "armAway"