1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
from .constant import (
MODE_ID_TO_NAME_KEY,
MODE_KEY,
MODE_NAME_TO_ID_KEY,
LOCATION_MODES_PATH_FORMAT,
LOCATION_ACTIVEMODE_PATH_FORMAT,
MODE_REVISION_KEY
)
from .super import ArloSuper
AUTOMATION_ACTIVE_MODE = "automation/activeMode"
AUTOMATION_MODES = "automation/modes"
DEFAULT_MODES = {
"standby": "Stand By",
"armAway": "Armed Away",
"armHome": "Armed Home"
}
def location_name(name, user):
if user:
return f"user_location_{name}"
return f"location_{name}"
class ArloLocation(ArloSuper):
""" Represents a Location object.
Each Arlo account can have multiple owned locations and multiple shared locations.
"""
def __init__(self, arlo, attrs, user=False):
super().__init__(location_name(attrs.get("locationName", "unknown"), user),
arlo, attrs,
id=attrs.get("locationId", "unknown"),
type="location")
self._device_ids = attrs.get("gatewayDeviceIds", [])
def _id_to_name(self, mode_id):
return self._load([MODE_ID_TO_NAME_KEY, mode_id], None)
def _name_to_id(self, mode_name):
return self._load([MODE_NAME_TO_ID_KEY, mode_name], None)
def _extra_headers(self):
return {
"x-forwarded-user": self._arlo.be.user_id,
"x-user-device-id": self._arlo.be.user_id,
}
def _parse_modes(self, modes):
for mode in modes.items():
mode_id = mode[0]
mode_name = mode[1].get("name", "")
if mode_id and mode_name != "":
self.debug(mode_id + "<=M=>" + mode_name)
self._save([MODE_ID_TO_NAME_KEY, mode_id], mode_name)
self._save([MODE_NAME_TO_ID_KEY, mode_name], mode_id)
def _event_handler(self, resource, event):
self.debug(self.name + " LOCATION got " + resource)
# A (user requested?) mode change.
if resource == AUTOMATION_ACTIVE_MODE:
props = event.get("properties", {})
mode = props.get("properties", {}).get("mode", None)
if mode is not None:
self._save_and_do_callbacks(MODE_KEY, mode)
mode_revision = props.get("revision", None)
if mode_revision is not None:
self._save(MODE_REVISION_KEY, mode_revision)
# A mode list update
if resource == AUTOMATION_MODES:
self._parse_modes(event.get("properties", {}).get("properties", {}))
# A (user requested?) mode change.
if resource == "states":
mode = event.get("states", {}).get("activeMode", None)
if mode is not None:
self._save_and_do_callbacks(MODE_KEY, mode)
@property
def available_modes(self):
"""Returns string list of available modes.
For example:: ``['disarmed', 'armed', 'home']``
"""
return list(self.available_modes_with_ids.keys())
@property
def available_modes_with_ids(self):
"""Returns dictionary of available modes mapped to Arlo ids.
For example:: ``{'armed': 'mode1','disarmed': 'mode0','home': 'mode2'}``
"""
modes = {}
for key, mode_id in self._load_matching([MODE_NAME_TO_ID_KEY, "*"]):
modes[key.split("/")[-1]] = mode_id
if not modes:
modes = DEFAULT_MODES
return modes
@property
def device_ids(self):
return self._device_ids
@property
def mode(self):
"""Returns the current mode."""
return self._load(MODE_KEY, "unknown")
@mode.setter
def mode(self, id_or_name):
"""Set the location mode.
:param id_or_name: mode to use, as returned by available_modes:
"""
# Convert to an ID.
mode_id = self._name_to_id(id_or_name)
if mode_id is None:
mode_id = id_or_name
if mode_id is None:
self._arlo.error("passed invalid id or name {id_or_name}")
return
# Need to change?
if self.mode == mode_id:
self.debug("no mode change needed")
return
# Post change.
self.debug(f"new-mode={mode_id}({id_or_name})")
mode_revision = self._load(MODE_REVISION_KEY, 1)
self.vdebug(f"old-revision={mode_revision}")
data = self._arlo.be.put(
LOCATION_ACTIVEMODE_PATH_FORMAT.format(self._id) + f"&revision={mode_revision}",
params={"mode": mode_id},
headers=self._extra_headers())
mode_revision = data.get("revision")
self.vdebug(f"new-revision={mode_revision}")
self._save_and_do_callbacks(MODE_KEY, mode_id)
self._save(MODE_REVISION_KEY, mode_revision)
@property
def mode_name(self):
"""Returns the current mode using the Arlo friendly name."""
return self._id_to_name(self._load(MODE_KEY, "standby"))
def update_mode(self):
"""Check and update the base's current mode."""
data = self._arlo.be.get(LOCATION_ACTIVEMODE_PATH_FORMAT.format(self._id),
headers=self._extra_headers())
mode_id = data.get("properties", {}).get('mode')
mode_revision = data.get("revision")
self._save_and_do_callbacks(MODE_KEY, mode_id)
self._save(MODE_REVISION_KEY, mode_revision)
def update_modes(self, _initial=False):
"""Get and update the available modes for the base."""
modes = self._arlo.be.get(LOCATION_MODES_PATH_FORMAT.format(self._id),
headers=self._extra_headers())
if modes is not None:
self._parse_modes(modes.get("properties", {}))
else:
self._arlo.error("failed to read modes.")
def stand_by(self):
self.mode = "standby"
@property
def is_stand_by(self):
return self.mode == "standby"
def arm_home(self):
self.mode = "armHome"
@property
def is_armed_home(self):
return self.mode == "armHome"
def arm_away(self):
self.mode = "armAway"
@property
def is_armed_away(self):
return self.mode == "armAway"
|