1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
import json
import logging
import re
from PyViCare.PyViCareFloorHeating import FloorHeating, FloorHeatingChannel
from PyViCare.PyViCareFuelCell import FuelCell
from PyViCare.PyViCareGazBoiler import GazBoiler
from PyViCare.PyViCareHeatingDevice import HeatingDevice
from PyViCare.PyViCareHeatPump import HeatPump
from PyViCare.PyViCareHybrid import Hybrid
from PyViCare.PyViCareOilBoiler import OilBoiler
from PyViCare.PyViCarePelletsBoiler import PelletsBoiler
from PyViCare.PyViCareRadiatorActuator import RadiatorActuator
from PyViCare.PyViCareRoomSensor import RoomSensor
from PyViCare.PyViCareRepeater import Repeater
from PyViCare.PyViCareElectricalEnergySystem import ElectricalEnergySystem
from PyViCare.PyViCareGateway import Gateway
from PyViCare.PyViCareVentilationDevice import VentilationDevice
logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())
class PyViCareDeviceConfig:
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(self, service, device_id, device_model, status, device_type=None, roles=None):
self.service = service
self.device_id = device_id
self.device_model = device_model
self.status = status
self.device_type = device_type
self.roles = roles if roles is not None else []
def asGeneric(self):
return HeatingDevice(self.service)
def asGazBoiler(self):
return GazBoiler(self.service)
def asFuelCell(self):
return FuelCell(self.service)
def asHeatPump(self):
return HeatPump(self.service)
def asOilBoiler(self):
return OilBoiler(self.service)
def asPelletsBoiler(self):
return PelletsBoiler(self.service)
def asHybridDevice(self):
return Hybrid(self.service)
def asRadiatorActuator(self):
return RadiatorActuator(self.service)
def asFloorHeating(self):
return FloorHeating(self.service)
def asFloorHeatingChannel(self):
return FloorHeatingChannel(self.service)
def asRoomSensor(self):
return RoomSensor(self.service)
def asRepeater(self):
return Repeater(self.service)
def asElectricalEnergySystem(self):
return ElectricalEnergySystem(self.service)
def asGateway(self):
return Gateway(self.service)
def asVentilation(self):
return VentilationDevice(self.service)
def getConfig(self):
return self.service.accessor
def getId(self):
return self.device_id
def getModel(self):
return self.device_model
def isOnline(self):
return self.status == "Online"
def getDeviceType(self):
return self.device_type
def getRoles(self):
return self.roles
def isGateway(self):
return self.service._isGateway() # pylint: disable=protected-access
# see: https://vitodata300.viessmann.com/vd300/ApplicationHelp/VD300/1031_de_DE/Ger%C3%A4teliste.html
def asAutoDetectDevice(self):
device_types = [
(self.asFuelCell, r"Vitovalor|Vitocharge|Vitoblo", []),
(self.asPelletsBoiler, r"Vitoligno|Ecotronic|VBC550P", []),
(self.asOilBoiler, r"Vitoladens|Vitoradial|Vitorondens|VPlusH|V200KW2_6", []),
(self.asGazBoiler, r"Vitodens|VScotH|Vitocrossal|VDensH|Vitopend|VPendH|OT_Heating_System", ["type:boiler"]),
(self.asHeatPump, r"Vitocal|VBC70|V200WO1A|CU401B", ["type:heatpump"]),
(self.asElectricalEnergySystem, r"E3_VitoCharge_03", ["type:ees"]), # ees, it this a typo?
(self.asElectricalEnergySystem, r"E3_VitoCharge_05", ["type:ess"]),
(self.asVentilation, r"E3_ViAir", ["type:ventilation"]),
(self.asVentilation, r"E3_ViAir", ["type:ventilation;central"]),
(self.asVentilation, r"E3_VitoPure", ["type:ventilation;purifier"]),
(self.asRadiatorActuator, r"E3_RadiatorActuator", ["type:radiator"]),
(self.asFloorHeating, r"Smart_zigbee_fht_main|E3_FloorHeatingCircuitDistributorBox", ["type:fhtMain"]),
(self.asFloorHeatingChannel, r"Smart_zigbee_fht_channel", ["type:fhtChannel"]),
(self.asRoomSensor, r"E3_RoomSensor", ["type:climateSensor"]),
(self.asRepeater, r"E3_Repeater", ["type:repeater"]),
(self.asGateway, r"E3_TCU41_x04", ["type:gateway;TCU100"]),
(self.asGateway, r"E3_TCU19_x05", ["type:gateway;TCU200"]),
(self.asGateway, r"E3_TCU10_x07", ["type:gateway;TCU300"]),
(self.asGateway, r"Heatbox1", ["type:gateway;VitoconnectOpto1"]),
(self.asGateway, r"Heatbox2", ["type:gateway;VitoconnectOpto2/OT2"])
]
for (creator_method, type_name, roles) in device_types:
if re.search(type_name, self.device_model) or self.service.hasRoles(roles):
logger.info("detected %s %s", self.device_model, creator_method.__name__)
return creator_method()
logger.info("Could not auto detect %s. Use generic device.", self.device_model)
return self.asGeneric()
def get_raw_json(self):
return self.service.fetch_all_features()
def dump_secure(self, flat=False):
raw_data = self.get_raw_json()
device_info = {
"id": self.device_id,
"modelId": self.device_model,
"type": self.device_type,
"status": self.status,
"roles": self.roles
}
output = {
"device": device_info,
"data": raw_data['data']
}
if flat:
inner = ',\n'.join([json.dumps(x, sort_keys=True) for x in output['data']])
outer = json.dumps({'device': output['device'], 'data': ['placeholder']}, indent=0, sort_keys=True)
dumpJSON = outer.replace('"placeholder"', inner)
else:
dumpJSON = json.dumps(output, indent=4, sort_keys=True)
def repl(m):
return m.group(1) + ('#' * len(m.group(2))) + m.group(3)
return re.sub(r'(["\/])(\d{6,})(["\/])', repl, dumpJSON)
|