File: PyViCareDeviceConfig.py

package info (click to toggle)
pyvicare 2.58.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,944 kB
  • sloc: python: 6,182; sh: 5; makefile: 2
file content (160 lines) | stat: -rw-r--r-- 6,076 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import json
import logging
import re

from PyViCare.PyViCareFloorHeating import FloorHeating, FloorHeatingChannel
from PyViCare.PyViCareFuelCell import FuelCell
from PyViCare.PyViCareGazBoiler import GazBoiler
from PyViCare.PyViCareHeatingDevice import HeatingDevice
from PyViCare.PyViCareHeatPump import HeatPump
from PyViCare.PyViCareHybrid import Hybrid
from PyViCare.PyViCareOilBoiler import OilBoiler
from PyViCare.PyViCarePelletsBoiler import PelletsBoiler
from PyViCare.PyViCareRadiatorActuator import RadiatorActuator
from PyViCare.PyViCareRoomSensor import RoomSensor
from PyViCare.PyViCareRepeater import Repeater
from PyViCare.PyViCareElectricalEnergySystem import ElectricalEnergySystem
from PyViCare.PyViCareGateway import Gateway
from PyViCare.PyViCareVentilationDevice import VentilationDevice

logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())


class PyViCareDeviceConfig:
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def __init__(self, service, device_id, device_model, status, device_type=None, roles=None):
        self.service = service
        self.device_id = device_id
        self.device_model = device_model
        self.status = status
        self.device_type = device_type
        self.roles = roles if roles is not None else []

    def asGeneric(self):
        return HeatingDevice(self.service)

    def asGazBoiler(self):
        return GazBoiler(self.service)

    def asFuelCell(self):
        return FuelCell(self.service)

    def asHeatPump(self):
        return HeatPump(self.service)

    def asOilBoiler(self):
        return OilBoiler(self.service)

    def asPelletsBoiler(self):
        return PelletsBoiler(self.service)

    def asHybridDevice(self):
        return Hybrid(self.service)

    def asRadiatorActuator(self):
        return RadiatorActuator(self.service)

    def asFloorHeating(self):
        return FloorHeating(self.service)

    def asFloorHeatingChannel(self):
        return FloorHeatingChannel(self.service)

    def asRoomSensor(self):
        return RoomSensor(self.service)

    def asRepeater(self):
        return Repeater(self.service)

    def asElectricalEnergySystem(self):
        return ElectricalEnergySystem(self.service)

    def asGateway(self):
        return Gateway(self.service)

    def asVentilation(self):
        return VentilationDevice(self.service)

    def getConfig(self):
        return self.service.accessor

    def getId(self):
        return self.device_id

    def getModel(self):
        return self.device_model

    def isOnline(self):
        return self.status == "Online"

    def getDeviceType(self):
        return self.device_type

    def getRoles(self):
        return self.roles

    def isGateway(self):
        return self.service._isGateway()  # pylint: disable=protected-access

    # see: https://vitodata300.viessmann.com/vd300/ApplicationHelp/VD300/1031_de_DE/Ger%C3%A4teliste.html
    def asAutoDetectDevice(self):
        device_types = [
            (self.asFuelCell, r"Vitovalor|Vitocharge|Vitoblo", []),
            (self.asPelletsBoiler, r"Vitoligno|Ecotronic|VBC550P", []),
            (self.asOilBoiler, r"Vitoladens|Vitoradial|Vitorondens|VPlusH|V200KW2_6", []),
            (self.asGazBoiler, r"Vitodens|VScotH|Vitocrossal|VDensH|Vitopend|VPendH|OT_Heating_System", ["type:boiler"]),
            (self.asHeatPump, r"Vitocal|VBC70|V200WO1A|CU401B", ["type:heatpump"]),
            (self.asElectricalEnergySystem, r"E3_VitoCharge_03", ["type:ees"]), # ees, it this a typo?
            (self.asElectricalEnergySystem, r"E3_VitoCharge_05", ["type:ess"]),
            (self.asVentilation, r"E3_ViAir", ["type:ventilation"]),
            (self.asVentilation, r"E3_ViAir", ["type:ventilation;central"]),
            (self.asVentilation, r"E3_VitoPure", ["type:ventilation;purifier"]),
            (self.asRadiatorActuator, r"E3_RadiatorActuator", ["type:radiator"]),
            (self.asFloorHeating, r"Smart_zigbee_fht_main|E3_FloorHeatingCircuitDistributorBox", ["type:fhtMain"]),
            (self.asFloorHeatingChannel, r"Smart_zigbee_fht_channel", ["type:fhtChannel"]),
            (self.asRoomSensor, r"E3_RoomSensor", ["type:climateSensor"]),
            (self.asRepeater, r"E3_Repeater", ["type:repeater"]),
            (self.asGateway, r"E3_TCU41_x04", ["type:gateway;TCU100"]),
            (self.asGateway, r"E3_TCU19_x05", ["type:gateway;TCU200"]),
            (self.asGateway, r"E3_TCU10_x07", ["type:gateway;TCU300"]),
            (self.asGateway, r"Heatbox1", ["type:gateway;VitoconnectOpto1"]),
            (self.asGateway, r"Heatbox2", ["type:gateway;VitoconnectOpto2/OT2"])
        ]

        for (creator_method, type_name, roles) in device_types:
            if re.search(type_name, self.device_model) or self.service.hasRoles(roles):
                logger.info("detected %s %s", self.device_model, creator_method.__name__)
                return creator_method()

        logger.info("Could not auto detect %s. Use generic device.", self.device_model)
        return self.asGeneric()

    def get_raw_json(self):
        return self.service.fetch_all_features()

    def dump_secure(self, flat=False):
        raw_data = self.get_raw_json()
        device_info = {
            "id": self.device_id,
            "modelId": self.device_model,
            "type": self.device_type,
            "status": self.status,
            "roles": self.roles
        }
        output = {
            "device": device_info,
            "data": raw_data['data']
        }

        if flat:
            inner = ',\n'.join([json.dumps(x, sort_keys=True) for x in output['data']])
            outer = json.dumps({'device': output['device'], 'data': ['placeholder']}, indent=0, sort_keys=True)
            dumpJSON = outer.replace('"placeholder"', inner)
        else:
            dumpJSON = json.dumps(output, indent=4, sort_keys=True)

        def repl(m):
            return m.group(1) + ('#' * len(m.group(2))) + m.group(3)

        return re.sub(r'(["\/])(\d{6,})(["\/])', repl, dumpJSON)