1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
import logging
from datetime import datetime
from PyViCare.PyViCareAbstractOAuthManager import AbstractViCareOAuthManager
from PyViCare.PyViCareBrowserOAuthManager import ViCareBrowserOAuthManager
from PyViCare.PyViCareCachedService import ViCareCachedService
from PyViCare.PyViCareDeviceConfig import PyViCareDeviceConfig
from PyViCare.PyViCareOAuthManager import ViCareOAuthManager
from PyViCare.PyViCareService import ViCareDeviceAccessor, ViCareService
from PyViCare.PyViCareUtils import PyViCareInvalidDataError
logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())
class PyViCare:
""""Viessmann ViCare API Python tools"""
def __init__(self) -> None:
self.cacheDuration = 60
def setCacheDuration(self, cache_duration):
self.cacheDuration = int(cache_duration)
def initWithCredentials(self, username: str, password: str, client_id: str, token_file: str):
self.initWithExternalOAuth(ViCareOAuthManager(
username, password, client_id, token_file))
def initWithExternalOAuth(self, oauth_manager: AbstractViCareOAuthManager) -> None:
self.oauth_manager = oauth_manager
self.__loadInstallations()
def initWithBrowserOAuth(self, client_id: str, token_file: str) -> None:
self.initWithExternalOAuth(ViCareBrowserOAuthManager(client_id, token_file))
def __buildService(self, accessor, roles):
if self.cacheDuration > 0:
return ViCareCachedService(self.oauth_manager, accessor, roles, self.cacheDuration)
return ViCareService(self.oauth_manager, accessor, roles)
def __loadInstallations(self):
installations = self.oauth_manager.get(
"/equipment/installations?includeGateways=true")
if "data" not in installations:
logger.error("Missing 'data' property when fetching installations")
raise PyViCareInvalidDataError(installations)
data = installations['data']
self.installations = Wrap(data)
self.devices = list(self.__extract_devices())
def __extract_devices(self):
for installation in self.installations:
for gateway in installation.gateways:
for device in gateway.devices:
if device.deviceType not in ["heating", "zigbee", "vitoconnect", "electricityStorage", "tcu", "ventilation"]:
continue # we are only interested in heating, photovoltaic, electricityStorage, and ventilation devices
accessor = ViCareDeviceAccessor(
installation.id, gateway.serial, device.id)
service = self.__buildService(accessor, device.roles)
logger.info("Device found: %s", device.modelId)
yield PyViCareDeviceConfig(service, device.id, device.modelId, device.status)
class DictWrap(object):
def __init__(self, d):
for k, v in d.items():
setattr(self, k, Wrap(v))
def Wrap(v):
if isinstance(v, list):
return [Wrap(x) for x in v]
if isinstance(v, dict):
return DictWrap(v)
if isinstance(v, str) and len(v) == 24 and v[23] == 'Z' and v[10] == 'T':
return datetime.strptime(v, '%Y-%m-%dT%H:%M:%S.%f%z')
return v
|