1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
"""Example for the telegram monitor callback."""
import asyncio
import getopt # pylint: disable=deprecated-module
import sys
from xknx import XKNX
from xknx.telegram import AddressFilter, Telegram
def telegram_received_cb(telegram: Telegram) -> None:
"""Do something with the received telegram."""
print(f"Telegram received: {telegram}")
def show_help() -> None:
"""Print Help."""
print("Telegram filter.")
print()
print("Usage:")
print()
print(__file__, " Listen to all telegrams")
print(
__file__, "-f --filter 1/2/*,1/4/[5-6] Filter for specific group addresses"
)
print(__file__, "-h --help Print help")
print()
async def monitor(address_filters: list[AddressFilter] | None) -> None:
"""Set telegram_received_cb within XKNX and connect to KNX/IP device in daemon mode."""
xknx = XKNX(daemon_mode=True)
xknx.telegram_queue.register_telegram_received_cb(
telegram_received_cb, address_filters
)
await xknx.start()
await xknx.stop()
async def main(argv: list[str]) -> None:
"""Parse command line arguments and start monitor."""
try:
opts, _ = getopt.getopt(argv, "hf:", ["help", "filter="])
except getopt.GetoptError:
show_help()
sys.exit(2)
address_filters = None
for opt, arg in opts:
if opt in ["-h", "--help"]:
show_help()
sys.exit()
if opt in ["-f", "--filter"]:
address_filters = list(map(AddressFilter, arg.split(",")))
await monitor(address_filters)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))
|