1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
import asyncio
import logging
import socket
import time
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
_LOGGER = logging.getLogger(__name__)
__version__ = "0.3.3"
MAX_UPDATES_WITHOUT_RESPONSE = 4
@dataclass
class Device30303:
"""A device discovered via port 30303."""
hostname: str
mac: str
ipaddress: str
name: str
def create_udp_socket(discovery_port: int) -> socket.socket:
"""Create a udp socket used for communicating with the device."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
# Legacy devices require source port to be the discovery port
sock.bind(("", discovery_port))
except OSError as err:
_LOGGER.debug("Port %s is not available: %s", discovery_port, err)
sock.bind(("", 0))
sock.setblocking(False)
return sock
def normalize_mac(mac: str) -> str:
return ":".join([i.zfill(2) for i in mac.replace("-", ":").split(":")])
class Discovery30303(asyncio.DatagramProtocol):
def __init__(
self,
destination: Tuple[str, int],
on_response: Callable[[bytes, Tuple[str, int]], None],
) -> None:
self.transport = None
self.destination = destination
self.on_response = on_response
def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
"""Trigger on_response."""
self.on_response(data, addr)
def error_received(self, ex: Optional[Exception]) -> None:
"""Handle error."""
_LOGGER.error("Discovery30303 error: %s", ex)
def connection_lost(self, ex: Optional[Exception]) -> None:
"""Do nothing on connection lost."""
class AIODiscovery30303:
"""A 30303 discovery scanner."""
DISCOVERY_PORT = 30303
BROADCAST_FREQUENCY = 3
RESPONSE_SIZE = 64
DISCOVER_MESSAGE = b"Discovery: Who is out there?"
BROADCAST_ADDRESS = "<broadcast>"
def __init__(self) -> None:
self.found_devices: List[Device30303] = []
def _destination_from_address(self, address: Optional[str]) -> Tuple[str, int]:
if address is None:
address = self.BROADCAST_ADDRESS
return (address, self.DISCOVERY_PORT)
def _process_response(
self,
data: Optional[bytes],
from_address: Tuple[str, int],
address: Optional[str],
response_list: Dict[str, Device30303],
) -> bool:
"""Process a response.
Returns True if processing should stop
"""
if data is None or data == self.DISCOVER_MESSAGE:
return
data_split = data.decode("utf-8").split("\r\n")
if len(data_split) < 3 or from_address[0] in response_list:
return
response_list[from_address] = Device30303(
hostname=data_split[0].rstrip(),
ipaddress=from_address[0],
mac=normalize_mac(data_split[1].rstrip()),
name=data_split[2].split("\x00")[0].rstrip(),
)
return from_address[0] == address
async def _async_run_scan(
self,
transport: asyncio.DatagramTransport,
destination: Tuple[str, int],
timeout: int,
found_all_future: "asyncio.Future[bool]",
) -> None:
"""Send the scans."""
_LOGGER.debug("discover: %s => %s", destination, self.DISCOVER_MESSAGE)
transport.sendto(self.DISCOVER_MESSAGE, destination)
quit_time = time.monotonic() + timeout
remain_time = timeout
while True:
time_out = min(remain_time, timeout / self.BROADCAST_FREQUENCY)
if time_out <= 0:
return
try:
await asyncio.wait_for(
asyncio.shield(found_all_future), timeout=time_out
)
except asyncio.TimeoutError:
if time.monotonic() >= quit_time:
return
# No response, send broadcast again in cast it got lost
_LOGGER.debug("discover: %s => %s", destination, self.DISCOVER_MESSAGE)
transport.sendto(self.DISCOVER_MESSAGE, destination)
else:
return # found_all
remain_time = quit_time - time.monotonic()
async def async_scan(
self, timeout: int = 10, address: Optional[str] = None
) -> List[Device30303]:
"""Discover on port 30303."""
sock = create_udp_socket(self.DISCOVERY_PORT)
destination = self._destination_from_address(address)
found_all_future = asyncio.Future()
response_list = {}
def _on_response(data: bytes, addr: Tuple[str, int]) -> None:
_LOGGER.debug("discover: %s <= %s", addr, data)
if self._process_response(data, addr, address, response_list):
found_all_future.set_result(True)
transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: Discovery30303(
destination=destination,
on_response=_on_response,
),
sock=sock,
)
try:
await self._async_run_scan(
transport, destination, timeout, found_all_future
)
finally:
transport.close()
self.found_devices = list(response_list.values())
return self.found_devices
|