1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
"""Unit tests for zeroconf._engine"""
from __future__ import annotations
import asyncio
import itertools
import logging
from unittest.mock import patch
import pytest
import zeroconf as r
from zeroconf import _engine, const
from zeroconf.asyncio import AsyncZeroconf
log = logging.getLogger("zeroconf")
original_logging_level = logging.NOTSET
def setup_module():
global original_logging_level
original_logging_level = log.level
log.setLevel(logging.DEBUG)
def teardown_module():
if original_logging_level != logging.NOTSET:
log.setLevel(original_logging_level)
# This test uses asyncio because it needs to access the cache directly
# which is not threadsafe
@pytest.mark.asyncio
async def test_reaper():
with patch.object(_engine, "_CACHE_CLEANUP_INTERVAL", 0.01):
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
zeroconf = aiozc.zeroconf
cache = zeroconf.cache
original_entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
record_with_10s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 10, b"a")
record_with_1s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 1, b"b")
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
question = r.DNSQuestion("_hap._tcp._local.", const._TYPE_PTR, const._CLASS_IN)
now = r.current_time_millis()
other_known_answers: set[r.DNSRecord] = {
r.DNSPointer(
"_hap._tcp.local.",
const._TYPE_PTR,
const._CLASS_IN,
10000,
"known-to-other._hap._tcp.local.",
)
}
zeroconf.question_history.add_question_at_time(question, now, other_known_answers)
assert zeroconf.question_history.suppresses(question, now, other_known_answers)
entries_with_cache = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
await asyncio.sleep(1.2)
entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
assert zeroconf.cache.get(record_with_1s_ttl) is None
await aiozc.async_close()
assert not zeroconf.question_history.suppresses(question, now, other_known_answers)
assert entries != original_entries
assert entries_with_cache != original_entries
assert record_with_10s_ttl in entries
assert record_with_1s_ttl not in entries
@pytest.mark.asyncio
async def test_reaper_aborts_when_done():
"""Ensure cache cleanup stops when zeroconf is done."""
with patch.object(_engine, "_CACHE_CLEANUP_INTERVAL", 0.01):
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
zeroconf = aiozc.zeroconf
record_with_10s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 10, b"a")
record_with_1s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 1, b"b")
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None
await aiozc.async_close()
await asyncio.sleep(1.2)
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None
|