1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
"""Example on how to disconnect/reset all available tunneling channels."""
import asyncio
from xknx import XKNX
from xknx.io import GatewayScanner
from xknx.io.request_response import ConnectionState, Disconnect
from xknx.io.transport import UDPTransport
from xknx.knxip import HPAI
async def main() -> None:
"""Search for a Tunnelling device, walk through all possible channels and disconnect them."""
xknx = XKNX()
gatewayscanner = GatewayScanner(xknx)
gateways = await gatewayscanner.scan()
if not gateways:
print("No Gateways found")
return
gateway = gateways[0]
if not gateway.supports_tunnelling:
print("Gateway does not support tunneling")
return
udp_transport = UDPTransport((gateway.local_ip, 0), (gateway.ip_addr, gateway.port))
await udp_transport.connect()
local_hpai = HPAI(*udp_transport.getsockname())
for i in range(255):
conn_state = ConnectionState(
udp_transport, communication_channel_id=i, local_hpai=local_hpai
)
await conn_state.start()
if conn_state.success:
print("Disconnecting ", i)
disconnect = Disconnect(
udp_transport, communication_channel_id=i, local_hpai=local_hpai
)
await disconnect.start()
if disconnect.success:
print("Disconnected ", i)
asyncio.run(main())
|