File: daikin.py

package info (click to toggle)
python-netdisco 2.9.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 516 kB
  • sloc: python: 1,550; xml: 247; sh: 10; makefile: 2
file content (103 lines) | stat: -rw-r--r-- 2,939 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""Daikin device discovery."""
import socket

from datetime import timedelta
from typing import Dict, List  # noqa: F401
from urllib.parse import unquote

DISCOVERY_MSG = b"DAIKIN_UDP/common/basic_info"

UDP_SRC_PORT = 30000
UDP_DST_PORT = 30050

DISCOVERY_ADDRESS = '<broadcast>'
DISCOVERY_TIMEOUT = timedelta(seconds=2)


class Daikin:
    """Base class to discover Daikin devices."""

    def __init__(self):
        """Initialize the Daikin discovery."""
        self.entries = []  # type: List[Dict[str, str]]

    def scan(self):
        """Scan the network."""
        self.update()

    def all(self):
        """Scan and return all found entries."""
        self.scan()
        return self.entries

    def update(self):
        """Scan network for Daikin devices."""
        entries = []

        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.settimeout(DISCOVERY_TIMEOUT.seconds)
        sock.bind(("", UDP_SRC_PORT))

        try:

            sock.sendto(DISCOVERY_MSG, (DISCOVERY_ADDRESS, UDP_DST_PORT))

            while True:
                try:
                    data, (address, _) = sock.recvfrom(1024)

                    entry = {x[0]: x[1] for x in (
                        e.split('=', 1)
                        for e in data.decode("UTF-8").split(','))}

                    # expecting product, mac, activation code, version
                    if 'ret' not in entry or entry['ret'] != 'OK':
                        # non-OK return on response
                        continue

                    if 'mac' not in entry:
                        # no mac found for device"
                        continue

                    if 'type' not in entry or entry['type'] != 'aircon':
                        # no mac found for device"
                        continue

                    if 'name' in entry:
                        entry['name'] = unquote(entry['name'])

                    # in case the device was not configured to have an id
                    # then use the mac address
                    if 'id' in entry and entry['id'] == '':
                        entry['id'] = entry['mac']

                    entries.append({
                        'id': entry['id'],
                        'name': entry['name'],
                        'ip': address,
                        'mac': entry['mac'],
                        'ver': entry['ver'],
                    })

                except socket.timeout:
                    break

        finally:
            sock.close()

        self.entries = entries


def main():
    """Test Daikin discovery."""
    from pprint import pprint
    daikin = Daikin()
    pprint("Scanning for Daikin devices..")
    daikin.update()
    pprint(daikin.entries)


if __name__ == "__main__":
    main()