File: tests_nsprocessinglimit.py

package info (click to toggle)
bind9 1%3A9.20.20-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 43,788 kB
  • sloc: ansic: 316,296; sh: 49,693; python: 25,039; perl: 3,218; makefile: 2,247
file content (74 lines) | stat: -rw-r--r-- 2,316 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

import os

import isctest
import isctest.mark

pytestmark = [isctest.mark.with_dnstap]


def line_to_ips_and_queries(line):
    # dnstap-read output line example
    # 05-Feb-2026 11:00:57.853 RQ 10.53.0.4:38507 -> 10.53.0.3:22047 TCP 56b sub.example.tld/IN/NS
    _, _, _, _, _, dst, _, _, query = line.split(" ", 9)
    ip, _ = dst.split(":", 1)
    return (ip, query)


def extract_dnstap(ns, expectedlen):
    ns.rndc("dnstap -roll 1")
    path = os.path.join(ns.identifier, "dnstap.out.0")
    dnstapread = isctest.run.cmd(
        [isctest.vars.ALL["DNSTAPREAD"], path],
    )

    lines = dnstapread.out.splitlines()
    assert expectedlen == len(lines)
    return map(line_to_ips_and_queries, lines)


def expect_query(expected_query, expected_query_count, ips_and_queries):
    count = 0
    for _, query in ips_and_queries:
        if query == expected_query:
            count += 1
    assert count == expected_query_count


def expect_next_ip_and_query(expected_ips_and_queries, ips_and_queries):
    for expected_ip, expected_query in expected_ips_and_queries:
        ip, query = next(ips_and_queries)
        assert ip == expected_ip
        assert query == expected_query


def test_selfpointedglue_nslimit(ns4):
    msg = isctest.query.create("a.sub.example.tld.", "A")
    res = isctest.query.tcp(msg, ns4.ip)
    isctest.check.servfail(res)

    # The 4 formers lines are request to find sub.example2.tld NSs.
    # The latest 20 are queries to sub.example2.tld NSs.
    ips_and_queries = extract_dnstap(ns4, 24)

    # Checking the begining of the resulution
    expect_next_ip_and_query(
        [
            ("10.53.0.1", "./IN/NS"),
            ("10.53.0.1", "tld/IN/NS"),
            ("10.53.0.2", "example.tld/IN/NS"),
            ("10.53.0.3", "sub.example.tld/IN/NS"),
        ],
        ips_and_queries,
    )
    expect_query("a.sub.example.tld/IN/A", 20, ips_and_queries)