1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
"""
Abstract base for a specific KNX/IP connection (Tunneling or Routing).
* It handles connection and disconnections
* It starts and stops a udp transport
* It packs Telegrams into KNX Frames and passes them to a udp transport
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from xknx.cemi import CEMIFrame
from .transport.ip_transport import KNXIPTransport
CEMIBytesCallbackType = Callable[[bytes], None]
class Interface(ABC):
"""Abstract base class for KNX/IP connections."""
__slots__ = ("transport",)
transport: KNXIPTransport
@abstractmethod
async def connect(self) -> None:
"""Connect to KNX bus. Raise CommunicationError when not successful."""
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from KNX bus."""
@abstractmethod
async def send_cemi(self, cemi: CEMIFrame) -> None:
"""Send CEMIFrame to KNX bus."""
|