File: example_telegram_monitor.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (58 lines) | stat: -rw-r--r-- 1,674 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Example for the telegram monitor callback."""

import asyncio
import getopt  # pylint: disable=deprecated-module
import sys

from xknx import XKNX
from xknx.telegram import AddressFilter, Telegram


def telegram_received_cb(telegram: Telegram) -> None:
    """Do something with the received telegram."""
    print(f"Telegram received: {telegram}")


def show_help() -> None:
    """Print Help."""
    print("Telegram filter.")
    print()
    print("Usage:")
    print()
    print(__file__, "                            Listen to all telegrams")
    print(
        __file__, "-f --filter 1/2/*,1/4/[5-6]    Filter for specific group addresses"
    )
    print(__file__, "-h --help                      Print help")
    print()


async def monitor(address_filters: list[AddressFilter] | None) -> None:
    """Set telegram_received_cb within XKNX and connect to KNX/IP device in daemon mode."""
    xknx = XKNX(daemon_mode=True)
    xknx.telegram_queue.register_telegram_received_cb(
        telegram_received_cb, address_filters
    )
    await xknx.start()
    await xknx.stop()


async def main(argv: list[str]) -> None:
    """Parse command line arguments and start monitor."""
    try:
        opts, _ = getopt.getopt(argv, "hf:", ["help", "filter="])
    except getopt.GetoptError:
        show_help()
        sys.exit(2)
    address_filters = None
    for opt, arg in opts:
        if opt in ["-h", "--help"]:
            show_help()
            sys.exit()
        if opt in ["-f", "--filter"]:
            address_filters = list(map(AddressFilter, arg.split(",")))
    await monitor(address_filters)


if __name__ == "__main__":
    asyncio.run(main(sys.argv[1:]))