File: zeroconf.py

package info (click to toggle)
raritan-json-rpc-sdk 4.0.20%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 57,236 kB
  • sloc: cs: 223,121; perl: 117,786; python: 26,872; javascript: 6,544; makefile: 27
file content (49 lines) | stat: -rw-r--r-- 1,932 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# SPDX-License-Identifier: BSD-3-Clause
#
# Copyright 2019 Raritan Inc. All rights reserved.

import time, socket

ZEROCONF_TIMEOUT_DEFAULT = 4

class ZeroconfDiscoveryListener:
    def __init__(self):
        self.found_pdus = {}
        self.pdu_prefixes = ["PX2", "PX3", "SRC", "BCM", "BCM2", "PX3TS", "PMC", "PMMC", "PXE", "SCH", "SRC", "LP", "PXO", "PXC"]

    def remove_service(self, zeroconf, service_type, name):
        pass

    def add_service(self, zeroconf, service_type, name):
        try:
            entry = zeroconf.get_service_info(service_type, name)
            if not any([entry.name.startswith(pfx) for pfx in self.pdu_prefixes]):
                return
            self.found_pdus[entry.addresses[0]] = { "fw": entry.properties[b'rr_fw'], "port": entry.port }
        except (KeyboardInterrupt, ImportError, NameError):
            pass

def discover(timeout = ZEROCONF_TIMEOUT_DEFAULT):
    try:
        from zeroconf import ServiceBrowser, Zeroconf
        zeroconf = Zeroconf()
        listener = ZeroconfDiscoveryListener()
        browser_old_name = ServiceBrowser(zeroconf, "_raritan_rpcs._tcp.local.", listener)
        browser_new_name = ServiceBrowser(zeroconf, "_raritan-rpcs._tcp.local.", listener)
        print("Zeroconf discovery started")
        time.sleep(timeout)
        browser_old_name.cancel()
        browser_new_name.cancel()
        zeroconf.close()
        pdus = []
        print("Found %i PDUs in the local subnet:" % len(listener.found_pdus))
        for k, v in listener.found_pdus.items():
            ip = str(socket.inet_ntoa(k))
            port = v.get("port")
            fw_version = str(v.get("fw"), "utf-8")
            print("%s -- %s" % (ip, fw_version))
            pdus.append({ "ip": ip, "port": port, "fw": fw_version })
        return pdus
    except ImportError:
        print("Please install the zeroconf python module (pip3 install zeroconf)")
        raise