1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
"""Tellstick device discovery."""
import socket
from datetime import timedelta
import logging
from typing import List, Tuple # noqa: F401
DISCOVERY_PORT = 30303
DISCOVERY_ADDRESS = '<broadcast>'
DISCOVERY_PAYLOAD = b"D"
DISCOVERY_TIMEOUT = timedelta(seconds=2)
class Tellstick:
"""Base class to discover Tellstick devices."""
def __init__(self):
"""Initialize the Tellstick discovery."""
self.entries = [] # type: List[Tuple[str]]
def scan(self):
"""Scan the network."""
self.update()
def all(self):
"""Scan and return all found entries."""
self.scan()
return self.entries
def update(self):
"""Scan network for Tellstick devices."""
entries = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(DISCOVERY_TIMEOUT.seconds)
sock.sendto(DISCOVERY_PAYLOAD, (DISCOVERY_ADDRESS, DISCOVERY_PORT))
while True:
try:
data, (address, _) = sock.recvfrom(1024)
entry = data.decode("ascii").split(":")
# expecting product, mac, activation code, version
if len(entry) != 4:
continue
entry.insert(0, address)
entries.append(tuple(entry))
except socket.timeout:
break
except UnicodeDecodeError:
# Catch invalid responses
logging.getLogger(__name__).debug(
'Ignoring invalid unicode response from %s', address)
continue
self.entries = entries
sock.close()
def main():
"""Test Tellstick discovery."""
from pprint import pprint
tellstick = Tellstick()
pprint("Scanning for Tellstick devices..")
tellstick.update()
pprint(tellstick.entries)
if __name__ == "__main__":
main()
|