File: test_types.py

package info (click to toggle)
python-zeroconf 0.147.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,380 kB
  • sloc: python: 15,356; makefile: 23
file content (154 lines) | stat: -rw-r--r-- 4,767 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Unit tests for zeroconf._services.types."""

from __future__ import annotations

import logging
import os
import socket
import sys
import unittest

import zeroconf as r
from zeroconf import ServiceInfo, Zeroconf, ZeroconfServiceTypes

from .. import _clear_cache, has_working_ipv6

log = logging.getLogger("zeroconf")
original_logging_level = logging.NOTSET


def setup_module():
    global original_logging_level
    original_logging_level = log.level
    log.setLevel(logging.DEBUG)


def teardown_module():
    if original_logging_level != logging.NOTSET:
        log.setLevel(original_logging_level)


def test_integration_with_listener(disable_duplicate_packet_suppression):
    type_ = "_test-listen-type._tcp.local."
    name = "xxxyyy"
    registration_name = f"{name}.{type_}"

    zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
    desc = {"path": "/~paulsm/"}
    info = ServiceInfo(
        type_,
        registration_name,
        80,
        0,
        0,
        desc,
        "ash-2.local.",
        addresses=[socket.inet_aton("10.0.1.2")],
    )
    zeroconf_registrar.registry.async_add(info)
    try:
        service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
        assert type_ in service_types
        _clear_cache(zeroconf_registrar)
        service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
        assert type_ in service_types

    finally:
        zeroconf_registrar.close()


@unittest.skipIf(not has_working_ipv6(), "Requires IPv6")
@unittest.skipIf(os.environ.get("SKIP_IPV6"), "IPv6 tests disabled")
def test_integration_with_listener_v6_records(disable_duplicate_packet_suppression):
    type_ = "_test-listenv6rec-type._tcp.local."
    name = "xxxyyy"
    registration_name = f"{name}.{type_}"
    addr = "2606:2800:220:1:248:1893:25c8:1946"  # example.com

    zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
    desc = {"path": "/~paulsm/"}
    info = ServiceInfo(
        type_,
        registration_name,
        80,
        0,
        0,
        desc,
        "ash-2.local.",
        addresses=[socket.inet_pton(socket.AF_INET6, addr)],
    )
    zeroconf_registrar.registry.async_add(info)
    try:
        service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
        assert type_ in service_types
        _clear_cache(zeroconf_registrar)
        service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
        assert type_ in service_types

    finally:
        zeroconf_registrar.close()


@unittest.skipIf(not has_working_ipv6() or sys.platform == "win32", "Requires IPv6")
@unittest.skipIf(os.environ.get("SKIP_IPV6"), "IPv6 tests disabled")
@unittest.skipIf(os.environ.get("DEBIAN_TEST"), "disabled by Debian builder")
def test_integration_with_listener_ipv6(disable_duplicate_packet_suppression):
    type_ = "_test-listenv6ip-type._tcp.local."
    name = "xxxyyy"
    registration_name = f"{name}.{type_}"
    addr = "2606:2800:220:1:248:1893:25c8:1946"  # example.com

    zeroconf_registrar = Zeroconf(ip_version=r.IPVersion.V6Only)
    desc = {"path": "/~paulsm/"}
    info = ServiceInfo(
        type_,
        registration_name,
        80,
        0,
        0,
        desc,
        "ash-2.local.",
        addresses=[socket.inet_pton(socket.AF_INET6, addr)],
    )
    zeroconf_registrar.registry.async_add(info)
    try:
        service_types = ZeroconfServiceTypes.find(ip_version=r.IPVersion.V6Only, timeout=2)
        assert type_ in service_types
        _clear_cache(zeroconf_registrar)
        service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
        assert type_ in service_types

    finally:
        zeroconf_registrar.close()


def test_integration_with_subtype_and_listener(disable_duplicate_packet_suppression):
    subtype_ = "_subtype._sub"
    type_ = "_listen._tcp.local."
    name = "xxxyyy"
    # Note: discovery returns only DNS-SD type not subtype
    discovery_type = f"{subtype_}.{type_}"
    registration_name = f"{name}.{type_}"

    zeroconf_registrar = Zeroconf(interfaces=["127.0.0.1"])
    desc = {"path": "/~paulsm/"}
    info = ServiceInfo(
        discovery_type,
        registration_name,
        80,
        0,
        0,
        desc,
        "ash-2.local.",
        addresses=[socket.inet_aton("10.0.1.2")],
    )
    zeroconf_registrar.registry.async_add(info)
    try:
        service_types = ZeroconfServiceTypes.find(interfaces=["127.0.0.1"], timeout=2)
        assert discovery_type in service_types
        _clear_cache(zeroconf_registrar)
        service_types = ZeroconfServiceTypes.find(zc=zeroconf_registrar, timeout=2)
        assert discovery_type in service_types

    finally:
        zeroconf_registrar.close()