File: example_powermeter_mqtt.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (233 lines) | stat: -rw-r--r-- 7,466 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python
"""
Example of a daemon listening for values from my main power-meter and resend them on a MQTT bus.

This example will not be able to run as it is - but it will hopefully give you some
ideas to how you can define DPT, and get their converted values, and even send them to MQTT
in a tested topic-format.

I have a Mosquitto MQTT Server - and running the Paho Python client.
I use some external MQTT libraries as well to handle the MQTT Topic-creation.

The data published on the MQTT bus can be fetched and stored in InfluxDB for graphing,
or monitored by other listeners - that triggers different events.

Please join XKNX on Discord (https://discord.gg/EuAQDXU) and chat with JohanElmis for
specific questions.
"""

import asyncio
import re
import sys

from xknx.devices import Device

try:
    # The following library is not included.
    from myhouse_sensors_mqtt import MetricType, SensorClientMqtt

    # import time
    import paho.mqtt.client as mqtt

    from xknx import XKNX
    from xknx.devices import Sensor

    # import pprint
except ImportError as import_err:
    err_list = str(import_err).split(" ")
    print(f"Unable to import module: {err_list[3]}")
    print(f"Please install the {err_list[3]} module for Python.")
    sys.exit()

BROKER_ADDRESS = "127.0.0.1"
# Give this client a name on the MQTT bus.
mqttc = mqtt.Client("main_power_central")

# Library to deal with verification of values.
# It also triggers and send values to the MQTT bus if the values has changed.
# With no changes it can go up to max_interval_seconds before it's sent.
# This allows me to fetch fast changes without storing all data at a 15s interval.
mh_sensor = SensorClientMqtt(
    change_trigger_percent=5,
    max_interval_seconds=600,
    metric_class="sensor",
    debug=True,
)

# Pre-compile some regexp filters that will be used to catch the different types.
RE_METER_READING = re.compile("MeterReading_")
RE_ACTIVE_POWER = re.compile("ActivePower")
RE_REACTIVE_POWER = re.compile("ReactivePower")
RE_APPARENT_POWER = re.compile("ApparentPower")
RE_VOLTAGE = re.compile("Voltage_")
RE_CURRENT = re.compile("Current_")
RE_FREQUENCY = re.compile("Frequency_")


def device_updated_cb(device: Device) -> None:
    """Do something with the updated device."""
    # print(device.name + ' ' + str(device.resolve_state()) + ' ' + device.unit_of_measurement())
    value = None
    topic = None
    if re.search("^EL-T-O_", device.name):
        metric = str(device.name)[7:]
        value = device.resolve_state()
        if RE_ACTIVE_POWER.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.WATT, "main_power_central", metric
            )
        elif RE_REACTIVE_POWER.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.VAR, "main_power_central", metric
            )
        elif RE_APPARENT_POWER.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.VA, "main_power_central", metric
            )
        elif RE_VOLTAGE.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.VOLTAGE, "main_power_central", metric
            )
        elif RE_CURRENT.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.CURRENT, "main_power_central", metric
            )
        elif RE_METER_READING.search(metric):
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.KWH, "main_power_central", metric
            )
        elif RE_FREQUENCY:
            topic = mh_sensor.get_mqtt_sensor_metric(
                MetricType.CUSTOM, "main_power_central", metric
            )
    else:
        print(
            f"Uncatched metric: {device.name} {device.resolve_state()!s} {device.unit_of_measurement()}"
        )

    if topic and value:
        # This will create a topic like:
        # myhouse/sensor/KWH/main_power_central/MeterReading_ActiveEnergy 103150280
        # myhouse/sensor/WATT/main_power_central/ActivePower_L2 1028.1099853515625
        # myhouse/sensor/VAR/main_power_central/ReactivePower_L3 136.3699951171875
        # myhouse/sensor/VA/main_power_central/ApparentPower_L3 1449.919921875
        # myhouse/sensor/CURRENT/main_power_central/Current_L3 6.239999294281006

        # When storing it to InfluxDB I have a DB named 'myhouse' where all these values will be stored.
        # Then I use the next two fields as metric name (lowercase):  sensor/current
        # break out main_power_central as a tag 'location', and the last field is tagged as the
        # 'sensor': ApparentPower_L3

        # This makes it really easy to graph in for example Grafana.
        # My latest version of the library doesn't send the value after the MQTT Topic, but a JSON structure
        # that also contains time.

        print(f"{topic} {value!s}")
        # ts = int(time.time() * 1000)
        mqttc.publish(topic, value)


async def main() -> None:
    """
    KNX device objects are created and the MQTT server connection is established.

    Then the XKNX Daemon will be started.
    Then everything else happens in the device_updated-function above as it is triggered when we receive data.
    """
    # Connect to KNX/IP device and listen if a switch was updated via KNX bus.
    xknx = XKNX(device_updated_cb=device_updated_cb, daemon_mode=True)

    # The KNX addresses to monitor are defined below, but is normally placed in an external
    #  file that is loaded in on start.

    # Generic Types not specifically supported by XKNX
    Sensor(
        xknx,
        "EL-T-O_MeterReading_ActiveEnergy",
        group_address_state="5/6/11",
        value_type="DPT-13",
    )
    Sensor(
        xknx,
        "EL-T-O_MeterReading_ReactiveEnergy",
        group_address_state="5/6/16",
        value_type="DPT-13",
    )

    # Active Power
    Sensor(
        xknx,
        "EL-T-O_TotalActivePower",
        group_address_state="5/6/24",
        value_type="power",
    )
    Sensor(
        xknx, "EL-T-O_ActivePower_L1", group_address_state="5/6/25", value_type="power"
    )
    # ...

    # Reactive Power
    Sensor(
        xknx,
        "EL-T-O_TotalReactivePower",
        group_address_state="5/6/28",
        value_type="power",
    )
    Sensor(
        xknx,
        "EL-T-O_ReactivePower_L1",
        group_address_state="5/6/29",
        value_type="power",
    )
    # ...

    # Apparent Power
    Sensor(
        xknx,
        "EL-T-O_TotalReactivePower",
        group_address_state="5/6/32",
        value_type="power",
    )
    Sensor(
        xknx,
        "EL-T-O_ApparentPower_L1",
        group_address_state="5/6/33",
        value_type="power",
    )
    # ...

    # Current
    Sensor(
        xknx,
        "EL-T-O_Current_L1",
        group_address_state="5/6/45",
        value_type="electric_current",
    )
    # ...

    # Voltage
    Sensor(
        xknx,
        "EL-T-O_Voltage_L1-N",
        group_address_state="5/6/48",
        value_type="electric_potential",
    )
    # ...

    # Frequency
    Sensor(
        xknx, "EL-T-O_Frequency", group_address_state="5/6/53", value_type="frequency"
    )

    mqttc.connect(BROKER_ADDRESS, 8883, 60)
    mqttc.loop_start()

    # Wait until Ctrl-C was pressed
    await xknx.start()

    await xknx.stop()
    await mqttc.loop_stop()
    await mqttc.disconnect()


asyncio.run(main())