File: dns_desec.py

package info (click to toggle)
python-certbot-dns-desec 1.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 156 kB
  • sloc: python: 366; makefile: 7
file content (188 lines) | stat: -rw-r--r-- 7,854 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""DNS Authenticator for deSEC."""
import json
import logging
import time

import dns.resolver
import requests
from certbot import interfaces
try:
    # needed for compatibility with older certbots, see #13
    import zope.interface
    zope_interface_implementer = zope.interface.implementer
    zope_interface_provider = zope.interface.provider
    i_authenticator = interfaces.IAuthenticator
    i_plugin_factory = interfaces.IPluginFactory
except ImportError:
    def get_noop_dec(*args):
        def noop_dec(obj):
            return obj
        return noop_dec
    zope_interface_implementer = zope_interface_provider = get_noop_dec
    i_authenticator = i_plugin_factory = None

from certbot import errors
from certbot.plugins import dns_common

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


@zope_interface_implementer(i_authenticator)  # needed for compatibility with older certbots, see #13
@zope_interface_provider(i_plugin_factory)  # needed for compatibility with older certbots, see #13
class Authenticator(dns_common.DNSAuthenticator):
    """DNS Authenticator for deSEC

    This Authenticator uses the deSEC REST API to fulfill a dns-01 challenge.
    """

    description = "Obtain certificates using a DNS TXT record (if you are using deSEC.io for DNS)."
    DEFAULT_ENDPOINT = "https://desec.io/api/v1"

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None

    @classmethod
    def add_parser_arguments(cls, add):  # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(
            add, default_propagation_seconds=80  # TODO decrease after deSEC fixed their NOTIFY problem
        )
        add("credentials", help="deSEC credentials INI file.")

    def more_info(self):  # pylint: disable=missing-docstring,no-self-use
        return (
            "This plugin configures a DNS TXT record to respond to a dns-01 challenge using "
            "the deSEC Remote REST API."
        )

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            key="credentials",
            label="deSEC credentials INI file",
            required_variables={
                "token": "Access token for deSEC API.",
            },
        )

    def _desec_work(self, domain, validation_name, validation, set_operator):
        client = self._get_desec_client()
        try:
            for _ in range(7):
                validation_name = dns.resolver.resolve(validation_name, 'CNAME')[0].target
                logger.debug(f"CNAME lookup result: {validation_name}")
        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoResolverConfiguration):
            pass
        if isinstance(validation_name, dns.name.Name):
            validation_name = validation_name.to_text().rstrip('.')
        zone = client.get_authoritative_zone(validation_name)
        subname = validation_name.rsplit(zone['name'], 1)[0].rstrip('.')
        records = client.get_txt_rrset(zone, subname)
        logger.debug(f"Current TXT records: {records}")
        records = set_operator(records, {f'"{validation}"'})
        logger.debug(f"Setting TXT records: {records}")
        client.set_txt_rrset(zone, subname, records)

    def _perform(self, domain, validation_name, validation):
        logger.debug(f"Authenticator._perform: {domain}, {validation_name}, {validation}")
        self._desec_work(domain, validation_name, validation, set.union)

    def _cleanup(self, domain, validation_name, validation):
        logger.debug(f"Authenticator._cleanup: {domain}, {validation_name}, {validation}")
        self._desec_work(domain, validation_name, validation, set.difference)

    def _get_desec_client(self):
        return _DesecConfigClient(
            self.credentials.conf("endpoint") or self.DEFAULT_ENDPOINT,
            self.credentials.conf("token"),
        )


class _DesecConfigClient(object):
    """
    Encapsulates all communication with the deSEC REST API.
    """

    def __init__(self, endpoint, token):
        logger.debug("creating _DesecConfigClient")
        self.endpoint = endpoint.rstrip('/')
        self.token = token
        self.session = requests.Session()
        self.session.headers["Authorization"] = f"Token {token}"
        self.session.headers["Content-Type"] = "application/json"

    @staticmethod
    def desec_request(method, **kwargs):
        for _ in range(3):
            response: requests.Response = method(**kwargs)
            if response.status_code == 429 and 'Retry-After' in response.headers:
                try:
                    cooldown = int(response.headers['Retry-After'])
                except ValueError:
                    return response
                logger.debug(f"deSEC API limit reached. Retrying request after {cooldown}s.")
                time.sleep(cooldown)
            else:
                return response
        return response

    def desec_get(self, **kwargs):
        return self.desec_request(self.session.get, **kwargs)

    def desec_put(self, **kwargs):
        return self.desec_request(self.session.put, **kwargs)

    def get_authoritative_zone(self, qname):
        response = self.desec_get(url=f"{self.endpoint}/domains/?owns_qname={qname}")
        self._check_response_status(response)
        data = self._response_json(response)
        try:
            return data[0]
        except IndexError:
            raise errors.PluginError(f"Could not find suitable domain in your account (did you create it?): {qname}")

    def get_txt_rrset(self, zone, subname):
        domain = zone['name']
        response = self.desec_get(
            url=f"{self.endpoint}/domains/{domain}/rrsets/{subname}.../TXT/",
        )

        if response.status_code == 404:
            return set()

        self._check_response_status(response, domain=domain)
        return set(self._response_json(response).get('records', set()))

    def set_txt_rrset(self, zone, subname, records: set):
        domain = zone['name']
        response = self.desec_put(
            url=f"{self.endpoint}/domains/{domain}/rrsets/",
            data=json.dumps([
                {"subname": subname, "type": "TXT", "ttl": zone['minimum_ttl'], "records": list(records)},
            ]),
        )
        return self._check_response_status(response, domain=domain)

    def _check_response_status(self, response, **kwargs):
        if 200 <= response.status_code <= 299:
            return
        elif response.status_code in [401, 403]:
            raise errors.PluginError(f"Could not authenticate against deSEC API: {response.content}")
        elif response.status_code == 404:
            raise errors.PluginError(f"Not found ({kwargs}): {response.content}")
        elif response.status_code == 429:
            raise errors.PluginError(f"deSEC throttled your request even after we waited the prescribed cool-down "
                                     f"time. Did you use the API in parallel? {response.content}")
        elif response.status_code >= 500:
            raise errors.PluginError(f"deSEC API server error (status {response.status_code}): {response.content}")
        else:
            raise errors.PluginError(f"Unknown error when talking to deSEC (status {response.status_code}: "
                                     f"Request was on '{response.request.url}' with payload {response.request.body}. "
                                     f"Response was '{response.content}'.")

    def _response_json(self, response):
        try:
            return response.json()
        except json.JSONDecodeError:
            raise errors.PluginError(f"deSEC API sent non-JSON response (status {response.status_code}): "
                                     f"{response.content}")