File: mocked_dns_response.py

package info (click to toggle)
python-email-validator 2.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 304 kB
  • sloc: python: 1,598; makefile: 43; sh: 15
file content (127 lines) | stat: -rw-r--r-- 4,976 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import Any, Dict, Iterator, Optional

import dns.exception
import dns.rdataset
import dns.resolver
import json
import os.path
import pytest

from email_validator.deliverability import caching_resolver

# To run deliverability checks without actually making
# DNS queries, we use a caching resolver where the cache
# is pre-loaded with DNS responses.

# When False, all DNS queries must come from the mocked
# data. When True, tests are run with live DNS queries
# and the DNS responses are saved to a file.
BUILD_MOCKED_DNS_RESPONSE_DATA = False


# This class implements the 'get' and 'put' methods
# expected for a dns.resolver.Resolver's cache.
class MockedDnsResponseData:
    DATA_PATH = os.path.dirname(__file__) + "/mocked-dns-answers.json"

    INSTANCE = None

    @staticmethod
    def create_resolver() -> dns.resolver.Resolver:
        if MockedDnsResponseData.INSTANCE is None:
            # Create a singleton instance of this class and load the saved DNS responses.
            # Except when BUILD_MOCKED_DNS_RESPONSE_DATA is true, don't load the data.
            singleton = MockedDnsResponseData()
            if not BUILD_MOCKED_DNS_RESPONSE_DATA:
                singleton.load()
            MockedDnsResponseData.INSTANCE = singleton

        # Return a new dns.resolver.Resolver configured for caching
        # using the singleton instance.
        dns_resolver = dns.resolver.Resolver(configure=BUILD_MOCKED_DNS_RESPONSE_DATA)
        return caching_resolver(cache=MockedDnsResponseData.INSTANCE, dns_resolver=dns_resolver)

    def __init__(self) -> None:
        self.data: Dict[dns.resolver.CacheKey, Optional[MockedDnsResponseData.Ans]] = {}

    # Loads the saved DNS response data from the JSON file and
    # re-structures it into dnspython classes.
    class Ans:  # mocks the dns.resolver.Answer class
        def __init__(self, rrset: dns.rdataset.Rdataset) -> None:
            self.rrset = rrset

        def __iter__(self) -> Iterator[Any]:
            return iter(self.rrset)

    def load(self) -> None:
        with open(self.DATA_PATH) as f:
            data = json.load(f)
            for item in data:
                key = (dns.name.from_text(item["query"]["name"] + "."),
                       dns.rdatatype.from_text(item["query"]["type"]),
                       dns.rdataclass.from_text(item["query"]["class"]))
                rdatas = [
                    dns.rdata.from_text(rdtype=key[1], rdclass=key[2], tok=rr)
                    for rr in item["answer"]
                ]
                if item["answer"]:
                    self.data[key] = MockedDnsResponseData.Ans(dns.rdataset.from_rdata_list(0, rdatas=rdatas))
                else:
                    self.data[key] = None

    def save(self) -> None:
        # Re-structure as a list with basic data types.
        data = [
            {
                "query": {
                    "name": key[0].to_text(omit_final_dot=True),
                    "type": dns.rdatatype.to_text(key[1]),
                    "class": dns.rdataclass.to_text(key[2]),
                },
                "answer": sorted([
                    rr.to_text()
                    for rr in value
                ])
            }
            for key, value in self.data.items()
            if value is not None
        ]
        with open(self.DATA_PATH, "w") as f:
            json.dump(data, f, indent=True)

    def get(self, key: dns.resolver.CacheKey) -> Optional[Ans]:
        # Special-case a domain to create a timeout.
        if key[0].to_text() == "timeout.com.":
            raise dns.exception.Timeout()  # type: ignore [no-untyped-call]

        # When building the DNS response database, return
        # a cache miss.
        if BUILD_MOCKED_DNS_RESPONSE_DATA:
            return None

        # Query the data for a matching record.
        if key in self.data:
            if not self.data[key]:
                raise dns.resolver.NoAnswer()  # type: ignore [no-untyped-call]
            return self.data[key]

        # Query the data for a response to an ANY query.
        ANY = dns.rdatatype.from_text("ANY")
        if (key[0], ANY, key[2]) in self.data and self.data[(key[0], ANY, key[2])] is None:
            raise dns.resolver.NXDOMAIN()  # type: ignore [no-untyped-call]

        raise ValueError(f"Saved DNS data did not contain query: {key}")

    def put(self, key: dns.resolver.CacheKey, value: Ans) -> None:
        # Build the DNS data by saving the live query response.
        if not BUILD_MOCKED_DNS_RESPONSE_DATA:
            raise ValueError("Should not get here.")
        self.data[key] = value


@pytest.fixture(scope="session", autouse=True)
def MockedDnsResponseDataCleanup(request: pytest.FixtureRequest) -> None:
    def cleanup_func() -> None:
        if BUILD_MOCKED_DNS_RESPONSE_DATA and MockedDnsResponseData.INSTANCE is not None:
            MockedDnsResponseData.INSTANCE.save()
    request.addfinalizer(cleanup_func)