1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
"""Support for communicating with myStrom PIRs."""
import aiohttp
from yarl import URL
from typing import Any, Dict, Iterable, List, Optional, Union
from . import _request as request
URI_PIR = URL("api/v1/")
class MyStromPir:
"""A class for a myStrom PIR."""
def __init__(self, host: str, session: aiohttp.client.ClientSession = None) -> None:
"""Initialize the switch."""
self._close_session = False
self._host = host
self._session = session
self._intensity = None
self._day = None
self._light_raw = None
self._sensors = None
self._temperature_measured = None
self._temperature_compensated = None
self._temperature_compensation = None
self._temperature_raw = None
self._motion = None
self._settings = None
self._pir = None
self._actions = None
self.uri = URL.build(scheme="http", host=self._host).join(URI_PIR)
async def get_settings(self) -> None:
"""Get the current settings from the PIR."""
url = URL(self.uri).join(URL("settings"))
response = await request(self, uri=url)
self._settings = response
async def get_actions(self) -> None:
"""Get the current action settings from the PIR."""
url = URL(self.uri).join(URL("action"))
response = await request(self, uri=url)
self._actions = response
async def get_pir(self) -> None:
"""Get the current PIR settings."""
url = URL(self.uri).join(URL("settings/pir"))
response = await request(self, uri=url)
self._pir = response
async def get_sensors_state(self) -> None:
"""Get the state of the sensors from the PIR."""
url = URL(self.uri).join(URL("sensors"))
response = await request(self, uri=url)
# The return data has the be re-written as the temperature is not rounded
self._sensors = {
"motion": response["motion"],
"light": response["light"],
"temperature": round(response["temperature"], 2),
}
async def get_temperatures(self) -> None:
"""Get the temperatures from the PIR."""
# There is a different URL for the temp endpoint
url = URL.build(scheme="http", host=self._host) / "temp"
response = await request(self, uri=url)
self._temperature_raw = response
self._temperature_measured = round(response["measured"], 2)
self._temperature_compensated = round(response["compensated"], 2)
self._temperature_compensation = round(response["compensation"], 3)
async def get_motion(self) -> None:
"""Get the state of the motion sensor from the PIR."""
url = URL(self.uri).join(URL("motion"))
response = await request(self, uri=url)
self._motion = response["motion"]
async def get_light(self) -> None:
"""Get the state of the light sensor from the PIR."""
url = URL(self.uri).join(URL("light"))
response = await request(self, uri=url)
self._intensity = response["intensity"]
self._day = response["day"]
self._light_raw = response["raw"]
@property
def settings(self) -> Optional[dict]:
"""Return current settings."""
return self._settings
@property
def actions(self) -> Optional[dict]:
"""Return current action settings."""
return self._actions
@property
def pir(self) -> Optional[dict]:
"""Return current PIR settings."""
return self._pir
@property
def sensors(self) -> Optional[dict]:
"""Return current sensor values."""
return self._sensors
@property
def temperature_measured(self) -> Optional[str]:
"""Return current measured temperature."""
return self._temperature_measured
@property
def temperature_compensated(self) -> Optional[str]:
"""Return current compensated temperature."""
return self._temperature_compensated
@property
def temperature_compensation(self) -> Optional[str]:
"""Return current temperature compensation."""
return self._temperature_compensation
@property
def temperature_raw(self) -> Optional[dict]:
"""Return current raw temperature values."""
return self._temperature_raw
@property
def motion(self) -> Optional[str]:
"""Return the state of the motion sensor."""
return self._motion
@property
def intensity(self) -> Optional[str]:
"""Return the intensity reported by the light sensor."""
return self._intensity
@property
def day(self) -> Optional[str]:
"""Return the information based on the thresholds set."""
return self._day
@property
def light_raw(self) -> Optional[str]:
"""Return the raw data from the ADC."""
return {
"visible": self._light_raw["adc0"],
"infrared": self._light_raw["adc1"],
}
async def close(self) -> None:
"""Close an open client session."""
if self._session and self._close_session:
await self._session.close()
async def __aenter__(self) -> "MyStromPir":
"""Async enter."""
return self
async def __aexit__(self, *exc_info) -> None:
"""Async exit."""
await self.close()
|