1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
"""
Abstraction for handling KNXnet/IP routing.
Routing uses UDP Multicast to send and receive KNXnet/IP messages.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import logging
import random
from typing import TYPE_CHECKING, Final
from xknx.cemi import CEMIFrame, CEMIMessageCode
from xknx.core import XknxConnectionState, XknxConnectionType
from xknx.exceptions import CommunicationError
from xknx.knxip import (
HPAI,
KNXIPFrame,
KNXIPServiceType,
RoutingBusy,
RoutingIndication,
RoutingLostMessage,
)
from xknx.telegram import IndividualAddress
from .const import DEFAULT_INDIVIDUAL_ADDRESS, DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from .interface import CEMIBytesCallbackType, Interface
from .ip_secure import SecureGroup
from .transport import KNXIPTransport, UDPTransport
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
BUSY_DECREMENT_TIME: Final = 0.005 # 5 ms
BUSY_INCREMENT_COOLDOWN: Final = 0.01 # 10 ms
BUSY_RANDOM_TIME_FACTOR: Final = 0.05 # 50 ms
BUSY_SLOWDURATION_TIME_FACTOR: Final = 0.1 # 100 ms
ROUTING_INDICATION_WAIT_TIME: Final = 0.02 # 20 ms
DEFAULT_LATENCY_TOLERANCE_MS: Final = 1000
class _RoutingFlowControl:
"""
Class for handling KNXnet/IP routing flow control.
See KNX Specifications 3.8.5 Routing §2.3.5 Flow control handling
"""
def __init__(self) -> None:
self._last_busy_frame_time: float = 0.0
self._last_sent_routing_indication_time: float = 0.0
self._loop = asyncio.get_running_loop()
self._ready = asyncio.Event()
self._ready.set()
self._received_busy_frames: int = 0
self._timer_task: asyncio.Task[None] | None = None
self._wait_start_time: float | None = None
self._wait_time_ms: int = 0
def cancel(self) -> None:
"""Cancel internal tasks."""
if self._timer_task:
self._timer_task.cancel()
@asynccontextmanager
async def throttle(self) -> AsyncIterator[None]:
"""Context manager to wait for ready state and throttle outgoing frames."""
# limit RoutingIndication transmission rate according to
# KNX Specifications 3.2.6 Communication Medium KNX IP §2.1
# simplified version - pause 20 ms after transmit a RoutingIndication
elapsed = self._loop.time() - self._last_sent_routing_indication_time
if elapsed < ROUTING_INDICATION_WAIT_TIME:
await asyncio.sleep(ROUTING_INDICATION_WAIT_TIME - elapsed)
await self._ready.wait()
yield
self._last_sent_routing_indication_time = self._loop.time()
def handle_routing_busy(self, routing_busy: RoutingBusy) -> None:
"""Handle incoming RoutingBusy."""
self._ready.clear()
now = self._loop.time()
previous_busy_frame_time = self._last_busy_frame_time
self._last_busy_frame_time = now
if self._wait_start_time is None:
logger.info(
"RoutingBusy received: %s",
routing_busy,
)
else:
# only apply if we have already received a RoutingBusy frame and are still pausing
if (now - previous_busy_frame_time) > BUSY_INCREMENT_COOLDOWN:
self._received_busy_frames += 1
logger.debug(
"RoutingBusy received: %s - %s ms since previous - number %s in moving time window",
routing_busy,
round((now - previous_busy_frame_time) * 1000),
self._received_busy_frames,
)
# discard frame if wait time is lower than remaining time
remaining_ms = (now - self._wait_start_time) * 1000
if remaining_ms >= routing_busy.wait_time:
return
self._wait_time_ms = routing_busy.wait_time
self._wait_start_time = now
if self._timer_task:
self._timer_task.cancel()
self._timer_task = asyncio.create_task(self._resume_sending())
async def _resume_sending(self) -> None:
"""Reset ready flag after wait_time_ms and fade out slowduration."""
random_wait_extension = (
random.random() * self._received_busy_frames * BUSY_RANDOM_TIME_FACTOR
)
slowduration = self._received_busy_frames * BUSY_SLOWDURATION_TIME_FACTOR
await asyncio.sleep(self._wait_time_ms / 1000 + random_wait_extension)
self._ready.set()
self._wait_start_time = None
await asyncio.sleep(slowduration)
while self._received_busy_frames > 0:
await asyncio.sleep(BUSY_DECREMENT_TIME)
self._received_busy_frames -= 1
class Routing(Interface):
"""Class for handling KNXnet/IP multicast communication."""
connection_type = XknxConnectionType.ROUTING
transport: UDPTransport
def __init__(
self,
xknx: XKNX,
individual_address: IndividualAddress | None,
cemi_received_callback: CEMIBytesCallbackType,
local_ip: str,
multicast_group: str = DEFAULT_MCAST_GRP,
multicast_port: int = DEFAULT_MCAST_PORT,
) -> None:
"""Initialize Routing class."""
self.xknx = xknx
self.individual_address = individual_address or DEFAULT_INDIVIDUAL_ADDRESS
self.cemi_received_callback = cemi_received_callback
self.local_ip = local_ip
self.multicast_group = multicast_group
self.multicast_port = multicast_port
self._init_transport()
self.transport.register_callback(
self._handle_frame,
[
KNXIPServiceType.ROUTING_INDICATION,
KNXIPServiceType.ROUTING_BUSY,
KNXIPServiceType.ROUTING_LOST_MESSAGE,
],
)
self._flow_control = _RoutingFlowControl()
def _init_transport(self) -> None:
"""Initialize transport."""
self.transport = UDPTransport(
local_addr=(self.local_ip, 0),
remote_addr=(self.multicast_group, self.multicast_port),
multicast=True,
)
####################
#
# CONNECT DISCONNECT
#
####################
async def connect(self) -> bool:
"""Start routing."""
self.xknx.current_address = self.individual_address
self.xknx.connection_manager.connection_state_changed(
XknxConnectionState.CONNECTING, self.connection_type
)
try:
await self.transport.connect()
except OSError as ex:
logger.debug(
"Could not establish connection to KNXnet/IP network. %s: %s",
type(ex).__name__,
ex,
)
self.xknx.connection_manager.connection_state_changed(
XknxConnectionState.DISCONNECTED
)
# close udp transport to prevent open file descriptors
self.transport.stop()
raise CommunicationError("Routing could not be started") from ex
self.xknx.connection_manager.connection_state_changed(
XknxConnectionState.CONNECTED, self.connection_type
)
return True
async def disconnect(self) -> None:
"""Stop routing."""
self.transport.stop()
self.xknx.connection_manager.connection_state_changed(
XknxConnectionState.DISCONNECTED
)
self._flow_control.cancel()
##################
#
# OUTGOING FRAMES
#
##################
async def send_cemi(self, cemi: CEMIFrame) -> None:
"""Send CEMIFrame to the network."""
# send L_DATA_IND to network, create L_DATA_CON locally for routing
cemi.code = CEMIMessageCode.L_DATA_IND
routing_indication = RoutingIndication(raw_cemi=cemi.to_knx())
async with self._flow_control.throttle():
self._send_knxipframe(KNXIPFrame.init_from_body(routing_indication))
cemi.code = CEMIMessageCode.L_DATA_CON
self.cemi_received_callback(cemi.to_knx())
def _send_knxipframe(self, knxipframe: KNXIPFrame) -> None:
"""Send KNXIPFrame to connected routing device."""
self.transport.send(knxipframe)
##################
#
# INCOMING FRAMES
#
##################
def _handle_frame(
self, knxipframe: KNXIPFrame, source: HPAI, _: KNXIPTransport
) -> None:
"""Handle incoming KNXIPFrames. Callback from internal transport."""
if isinstance(knxipframe.body, RoutingIndication):
self._handle_routing_indication(knxipframe.body)
elif isinstance(knxipframe.body, RoutingBusy):
self._flow_control.handle_routing_busy(knxipframe.body)
elif isinstance(knxipframe.body, RoutingLostMessage):
logger.warning(
"RoutingLostMessage received from %s - %s lost messages.",
source.ip_addr,
knxipframe.body.lost_messages,
)
else:
logger.warning("Service not implemented: %s", knxipframe)
def _handle_routing_indication(self, routing_indication: RoutingIndication) -> None:
"""Handle incoming RoutingIndication."""
self.cemi_received_callback(routing_indication.raw_cemi)
class SecureRouting(Routing):
"""Class for handling KNXnet/IP secure multicast communication."""
connection_type = XknxConnectionType.ROUTING_SECURE
transport: SecureGroup
def __init__(
self,
xknx: XKNX,
individual_address: IndividualAddress | None,
cemi_received_callback: CEMIBytesCallbackType,
local_ip: str,
backbone_key: bytes,
latency_ms: int | None = None,
multicast_group: str = DEFAULT_MCAST_GRP,
multicast_port: int = DEFAULT_MCAST_PORT,
) -> None:
"""Initialize SecureRouting class."""
self.backbone_key = backbone_key
self.latency_ms = latency_ms or DEFAULT_LATENCY_TOLERANCE_MS
super().__init__(
xknx,
individual_address=individual_address,
cemi_received_callback=cemi_received_callback,
local_ip=local_ip,
multicast_group=multicast_group,
multicast_port=multicast_port,
)
def _init_transport(self) -> None:
"""Initialize transport."""
self.transport = SecureGroup(
local_addr=(self.local_ip, 0),
remote_addr=(self.multicast_group, self.multicast_port),
backbone_key=self.backbone_key,
latency_ms=self.latency_ms,
)
|