1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
"""Unit tests for zeroconf._services.types."""
from __future__ import annotations
import logging
import os
import socket
import sys
import unittest
import zeroconf as r
from zeroconf import ServiceInfo, Zeroconf, ZeroconfServiceTypes
from .. import _clear_cache, has_working_ipv6
log = logging.getLogger("zeroconf")
original_logging_level = logging.NOTSET
def setup_module():
global original_logging_level
original_logging_level = log.level
log.setLevel(logging.DEBUG)
def teardown_module():
if original_logging_level != logging.NOTSET:
log.setLevel(original_logging_level)
def test_integration_with_listener(disable_duplicate_packet_suppression):
type_ = "_test-listen-type._tcp.local."
name = "xxxyyy"
registration_name = f"{name}.{type_}"
zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
desc = {"path": "/~paulsm/"}
info = ServiceInfo(
type_,
registration_name,
80,
0,
0,
desc,
"ash-2.local.",
addresses=[socket.inet_aton("10.0.1.2")],
)
zeroconf_registrar.registry.async_add(info)
try:
service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
assert type_ in service_types
_clear_cache(zeroconf_registrar)
service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
assert type_ in service_types
finally:
zeroconf_registrar.close()
@unittest.skipIf(not has_working_ipv6(), "Requires IPv6")
@unittest.skipIf(os.environ.get("SKIP_IPV6"), "IPv6 tests disabled")
def test_integration_with_listener_v6_records(disable_duplicate_packet_suppression):
type_ = "_test-listenv6rec-type._tcp.local."
name = "xxxyyy"
registration_name = f"{name}.{type_}"
addr = "2606:2800:220:1:248:1893:25c8:1946" # example.com
zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
desc = {"path": "/~paulsm/"}
info = ServiceInfo(
type_,
registration_name,
80,
0,
0,
desc,
"ash-2.local.",
addresses=[socket.inet_pton(socket.AF_INET6, addr)],
)
zeroconf_registrar.registry.async_add(info)
try:
service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
assert type_ in service_types
_clear_cache(zeroconf_registrar)
service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
assert type_ in service_types
finally:
zeroconf_registrar.close()
@unittest.skipIf(not has_working_ipv6() or sys.platform == "win32", "Requires IPv6")
@unittest.skipIf(os.environ.get("SKIP_IPV6"), "IPv6 tests disabled")
@unittest.skipIf(os.environ.get("DEBIAN_TEST"), "disabled by Debian builder")
def test_integration_with_listener_ipv6(disable_duplicate_packet_suppression):
type_ = "_test-listenv6ip-type._tcp.local."
name = "xxxyyy"
registration_name = f"{name}.{type_}"
addr = "2606:2800:220:1:248:1893:25c8:1946" # example.com
zeroconf_registrar = Zeroconf(ip_version=r.IPVersion.V6Only)
desc = {"path": "/~paulsm/"}
info = ServiceInfo(
type_,
registration_name,
80,
0,
0,
desc,
"ash-2.local.",
addresses=[socket.inet_pton(socket.AF_INET6, addr)],
)
zeroconf_registrar.registry.async_add(info)
try:
service_types = ZeroconfServiceTypes.find(ip_version=r.IPVersion.V6Only, timeout=2)
assert type_ in service_types
_clear_cache(zeroconf_registrar)
service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
assert type_ in service_types
finally:
zeroconf_registrar.close()
def test_integration_with_subtype_and_listener(disable_duplicate_packet_suppression):
subtype_ = "_subtype._sub"
type_ = "_listen._tcp.local."
name = "xxxyyy"
# Note: discovery returns only DNS-SD type not subtype
discovery_type = f"{subtype_}.{type_}"
registration_name = f"{name}.{type_}"
zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
desc = {"path": "/~paulsm/"}
info = ServiceInfo(
discovery_type,
registration_name,
80,
0,
0,
desc,
"ash-2.local.",
addresses=[socket.inet_aton("10.0.1.2")],
)
zeroconf_registrar.registry.async_add(info)
try:
service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
assert discovery_type in service_types
_clear_cache(zeroconf_registrar)
service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
assert discovery_type in service_types
finally:
zeroconf_registrar.close()
|