File: example_telegram_monitor.py

package info (click to toggle)
python-xknx 3.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,044 kB
  • sloc: python: 40,087; javascript: 8,556; makefile: 32; sh: 12
file content (141 lines) | stat: -rw-r--r-- 4,759 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Example for the telegram monitor callback."""

from __future__ import annotations

import asyncio
import getopt  # pylint: disable=deprecated-module
import sys
from typing import TYPE_CHECKING

from xknx import XKNX
from xknx.io.connection import ConnectionConfig
from xknx.telegram import AddressFilter, IndividualAddress, Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite

if TYPE_CHECKING:
    from xknxproject.models import KNXProject


def show_help() -> None:
    """Print Help."""
    print("Listen to telegrams on the KNX bus.")
    print()
    print("Options:")
    print()
    print("Option           Argument example")
    print("-i --ia          1.0.253                 Individual address to connect to")
    print(
        "-f --filter      1/2/*,1/4/[5-6]         Filter for specific group addresses"
    )
    print(
        "-k --knxproject  myproject.knxproj       Load KNX project file for address resolution"
    )
    print("-h --help                                Print help")
    print()
    print("Example:")
    print('python example_telegram_monitor.py -i "1.0.253" -f 1/2/*,1/4/[5-6]')
    print("This will listen to all telegrams for group addresses 1/2/* and 1/4/[5-6].")


def load_project(file_path: str) -> KNXProject:
    """Load KNX project from file."""
    # pylint: disable=import-outside-toplevel
    try:
        from xknxproject import XKNXProj
        from xknxproject.exceptions import InvalidPasswordException
    except ImportError:
        print(
            "xknxproject package is not installed. Please install it with 'pip install xknxproject'."
        )
        sys.exit(1)

    xknxproj = XKNXProj(file_path)
    try:
        return xknxproj.parse()
    except InvalidPasswordException:
        password = input(
            "Project file is password protected. Please enter the password: "
        )
        xknxproj.password = password
        return xknxproj.parse()


async def monitor(
    ia: IndividualAddress | None,
    address_filters: list[AddressFilter] | None,
    knx_project: KNXProject | None = None,
) -> None:
    """Set telegram_received_cb within XKNX and connect to KNX/IP device in daemon mode."""
    xknx = XKNX(
        connection_config=ConnectionConfig(individual_address=ia),
        daemon_mode=True,
    )

    if knx_project is not None:
        dpt_dict = {
            ga: data["dpt"]
            for ga, data in knx_project["group_addresses"].items()
            if data["dpt"] is not None
        }
        xknx.group_address_dpt.set(dpt_dict)

    def telegram_received_cb(telegram: Telegram) -> None:
        """Do something with the received telegram."""
        ia_string = str(telegram.source_address)
        ga_string = str(telegram.destination_address)
        payload: str | int | tuple[int, ...]
        if isinstance(telegram.payload, GroupValueWrite | GroupValueResponse):
            payload = telegram.payload.value.value
        else:
            payload = str(telegram.payload.__class__.__name__)
        print(
            f"{telegram.direction.value:8} {ia_string:20} | {ga_string:24} | {payload}"
        )
        if knx_project:
            ia_name = ""
            ga_name = ""
            data_str = ""
            if (device := knx_project["devices"].get(ia_string)) is not None:
                ia_name = f"{device['manufacturer_name']} {device['name']}"
            if (ga_data := knx_project["group_addresses"].get(ga_string)) is not None:
                ga_name = ga_data["name"]
            if (data := telegram.decoded_data) is not None:
                data_str = f"{data.value} {data.transcoder.unit or ''}"
            print(f"{'':8} {ia_name[:20]:20} | {ga_name[:24]:24} | {data_str}")

    xknx.telegram_queue.register_telegram_received_cb(
        telegram_received_cb, address_filters
    )
    await xknx.start()
    await xknx.stop()


async def main(argv: list[str]) -> None:
    """Parse command line arguments and start monitor."""
    try:
        opts, _ = getopt.getopt(
            argv, "hf:i:k:", ["help", "filter=", "interface=", "knxproject="]
        )
    except getopt.GetoptError:
        show_help()
        sys.exit(2)

    address_filters = None
    ia = None
    knx_project = None
    for opt, arg in opts:
        if opt in ["-h", "--help"]:
            show_help()
            sys.exit()
        if opt in ["-f", "--filter"]:
            address_filters = list(map(AddressFilter, arg.split(",")))
        if opt in ["-i", "--interface"]:
            ia = IndividualAddress(arg)
        if opt in ["-k", "--knxproject"]:
            knx_project = load_project(arg)

    await monitor(ia, address_filters, knx_project)


if __name__ == "__main__":
    asyncio.run(main(sys.argv[1:]))