1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
from enum import Enum
from .api import SHCAPI
class SHCIntrusionSystem:
DOMAIN_STATES = {
"armingState",
"alarmState",
"securityGapState",
"activeConfigurationProfile",
"systemAvailability",
}
class ArmingState(Enum):
SYSTEM_DISARMED = "SYSTEM_DISARMED"
SYSTEM_ARMED = "SYSTEM_ARMED"
SYSTEM_ARMING = "SYSTEM_ARMING"
class AlarmState(Enum):
ALARM_OFF = "ALARM_OFF"
PRE_ALARM = "PRE_ALARM"
ALARM_ON = "ALARM_ON"
ALARM_MUTED = "ALARM_MUTED"
class Profile(Enum):
FULL_PROTECTION = 0
PARTIAL_PROTECTION = 1
CUSTOM_PROTECTION = 2
def __init__(self, api: SHCAPI, raw_domain_state, root_device_id):
self._api = api
self._raw_system_availability = raw_domain_state["systemAvailability"]
self._raw_arming_state = raw_domain_state["armingState"]
self._raw_alarm_state = raw_domain_state["alarmState"]
self._raw_active_configuration_profile = raw_domain_state[
"activeConfigurationProfile"
]
self._raw_security_gap_state = raw_domain_state["securityGapState"]
self._root_device_id = root_device_id
self._callbacks = {}
@property
def id(self):
return "/intrusion"
@property
def manufacturer(self):
return "BOSCH"
@property
def name(self):
return "Intrusion Detection System"
@property
def root_device_id(self):
return self._root_device_id
@property
def device_model(self):
return "IDS"
@property
def deleted(self):
return False
@property
def system_availability(self) -> bool:
return self._raw_system_availability["available"]
@property
def arming_state(self) -> ArmingState:
return self.ArmingState(self._raw_arming_state["state"])
@property
def remaining_time_until_armed(self) -> int:
if self.arming_state == self.ArmingState.SYSTEM_ARMING:
return self._raw_arming_state["remainingTimeUntilArmed"]
return 0
@property
def alarm_state(self) -> AlarmState:
return self.AlarmState(self._raw_alarm_state["value"])
@property
def alarm_state_incidents(self):
return self._raw_alarm_state["incidents"]
@property
def active_configuration_profile(self) -> Profile:
return self.Profile(int(self._raw_active_configuration_profile["profileId"]))
@property
def security_gaps(self):
return self._raw_security_gap_state["securityGaps"]
def subscribe_callback(self, entity, callback):
self._callbacks[entity] = callback
def unsubscribe_callback(self, entity):
self._callbacks.pop(entity, None)
def summary(self):
print(f" Domain: {self.id}")
print(f" System Availability: {self.system_availability}")
print(f" Arming State: {self.arming_state}")
print(f" Alarm State: {self.alarm_state}")
def arm(self):
result = self._api.post_domain_action("intrusion/actions/arm")
def arm_full_protection(self):
data = {"@type": "armRequest", "profileId": "0"}
result = self._api.post_domain_action("intrusion/actions/arm", data)
def arm_partial_protection(self):
data = {"@type": "armRequest", "profileId": "1"}
result = self._api.post_domain_action("intrusion/actions/arm", data)
def arm_individual_protection(self):
data = {"@type": "armRequest", "profileId": "2"}
self._api.post_domain_action("intrusion/actions/arm", data)
def disarm(self):
self._api.post_domain_action("intrusion/actions/disarm")
def mute(self):
self._api.post_domain_action("intrusion/actions/mute")
def short_poll(self):
raw_domain_state = self._api.get_domain_intrusion_detection()
self._raw_system_availability = raw_domain_state["systemAvailability"]
self._raw_arming_state = raw_domain_state["armingState"]
self._raw_alarm_state = raw_domain_state["alarmState"]
self._raw_active_configuration_profile = raw_domain_state[
"activeConfigurationProfile"
]
self._raw_security_gap_state = raw_domain_state["securityGapState"]
def process_long_polling_poll_result(self, raw_result):
if raw_result["@type"] == "armingState":
self._raw_arming_state = raw_result
if raw_result["@type"] == "alarmState":
self._raw_alarm_state = raw_result
if raw_result["@type"] == "systemAvailability":
self._raw_system_availability = raw_result
if raw_result["@type"] == "activeConfigurationProfile":
self._raw_active_configuration_profile = raw_result
if raw_result["@type"] == "securityGapState":
self._raw_security_gap_state = raw_result
for callback in self._callbacks:
self._callbacks[callback]()
MODEL_MAPPING = {"IDS": SHCIntrusionSystem}
SUPPORTED_DOMAINS = MODEL_MAPPING.keys()
def build(api, domain_model, raw_domain_state):
assert domain_model in SUPPORTED_DOMAINS, "Domain model is supported"
return MODEL_MAPPING[domain_model](api=api, raw_domain_state=raw_domain_state)
|