1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
"""DNS Authenticator for deSEC."""
import json
import logging
import time
import dns.resolver
import requests
from certbot import interfaces
try:
# needed for compatibility with older certbots, see #13
import zope.interface
zope_interface_implementer = zope.interface.implementer
zope_interface_provider = zope.interface.provider
i_authenticator = interfaces.IAuthenticator
i_plugin_factory = interfaces.IPluginFactory
except ImportError:
def get_noop_dec(*args):
def noop_dec(obj):
return obj
return noop_dec
zope_interface_implementer = zope_interface_provider = get_noop_dec
i_authenticator = i_plugin_factory = None
from certbot import errors
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@zope_interface_implementer(i_authenticator) # needed for compatibility with older certbots, see #13
@zope_interface_provider(i_plugin_factory) # needed for compatibility with older certbots, see #13
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for deSEC
This Authenticator uses the deSEC REST API to fulfill a dns-01 challenge.
"""
description = "Obtain certificates using a DNS TXT record (if you are using deSEC.io for DNS)."
DEFAULT_ENDPOINT = "https://desec.io/api/v1"
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=80 # TODO decrease after deSEC fixed their NOTIFY problem
)
add("credentials", help="deSEC credentials INI file.")
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return (
"This plugin configures a DNS TXT record to respond to a dns-01 challenge using "
"the deSEC Remote REST API."
)
def _setup_credentials(self):
self.credentials = self._configure_credentials(
key="credentials",
label="deSEC credentials INI file",
required_variables={
"token": "Access token for deSEC API.",
},
)
def _desec_work(self, domain, validation_name, validation, set_operator):
client = self._get_desec_client()
try:
for _ in range(7):
validation_name = dns.resolver.resolve(validation_name, 'CNAME')[0].target
logger.debug(f"CNAME lookup result: {validation_name}")
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoResolverConfiguration):
pass
if isinstance(validation_name, dns.name.Name):
validation_name = validation_name.to_text().rstrip('.')
zone = client.get_authoritative_zone(validation_name)
subname = validation_name.rsplit(zone['name'], 1)[0].rstrip('.')
records = client.get_txt_rrset(zone, subname)
logger.debug(f"Current TXT records: {records}")
records = set_operator(records, {f'"{validation}"'})
logger.debug(f"Setting TXT records: {records}")
client.set_txt_rrset(zone, subname, records)
def _perform(self, domain, validation_name, validation):
logger.debug(f"Authenticator._perform: {domain}, {validation_name}, {validation}")
self._desec_work(domain, validation_name, validation, set.union)
def _cleanup(self, domain, validation_name, validation):
logger.debug(f"Authenticator._cleanup: {domain}, {validation_name}, {validation}")
self._desec_work(domain, validation_name, validation, set.difference)
def _get_desec_client(self):
return _DesecConfigClient(
self.credentials.conf("endpoint") or self.DEFAULT_ENDPOINT,
self.credentials.conf("token"),
)
class _DesecConfigClient(object):
"""
Encapsulates all communication with the deSEC REST API.
"""
def __init__(self, endpoint, token):
logger.debug("creating _DesecConfigClient")
self.endpoint = endpoint.rstrip('/')
self.token = token
self.session = requests.Session()
self.session.headers["Authorization"] = f"Token {token}"
self.session.headers["Content-Type"] = "application/json"
@staticmethod
def desec_request(method, **kwargs):
for _ in range(3):
response: requests.Response = method(**kwargs)
if response.status_code == 429 and 'Retry-After' in response.headers:
try:
cooldown = int(response.headers['Retry-After'])
except ValueError:
return response
logger.debug(f"deSEC API limit reached. Retrying request after {cooldown}s.")
time.sleep(cooldown)
else:
return response
return response
def desec_get(self, **kwargs):
return self.desec_request(self.session.get, **kwargs)
def desec_put(self, **kwargs):
return self.desec_request(self.session.put, **kwargs)
def get_authoritative_zone(self, qname):
response = self.desec_get(url=f"{self.endpoint}/domains/?owns_qname={qname}")
self._check_response_status(response)
data = self._response_json(response)
try:
return data[0]
except IndexError:
raise errors.PluginError(f"Could not find suitable domain in your account (did you create it?): {qname}")
def get_txt_rrset(self, zone, subname):
domain = zone['name']
response = self.desec_get(
url=f"{self.endpoint}/domains/{domain}/rrsets/{subname}.../TXT/",
)
if response.status_code == 404:
return set()
self._check_response_status(response, domain=domain)
return set(self._response_json(response).get('records', set()))
def set_txt_rrset(self, zone, subname, records: set):
domain = zone['name']
response = self.desec_put(
url=f"{self.endpoint}/domains/{domain}/rrsets/",
data=json.dumps([
{"subname": subname, "type": "TXT", "ttl": zone['minimum_ttl'], "records": list(records)},
]),
)
return self._check_response_status(response, domain=domain)
def _check_response_status(self, response, **kwargs):
if 200 <= response.status_code <= 299:
return
elif response.status_code in [401, 403]:
raise errors.PluginError(f"Could not authenticate against deSEC API: {response.content}")
elif response.status_code == 404:
raise errors.PluginError(f"Not found ({kwargs}): {response.content}")
elif response.status_code == 429:
raise errors.PluginError(f"deSEC throttled your request even after we waited the prescribed cool-down "
f"time. Did you use the API in parallel? {response.content}")
elif response.status_code >= 500:
raise errors.PluginError(f"deSEC API server error (status {response.status_code}): {response.content}")
else:
raise errors.PluginError(f"Unknown error when talking to deSEC (status {response.status_code}: "
f"Request was on '{response.request.url}' with payload {response.request.body}. "
f"Response was '{response.content}'.")
def _response_json(self, response):
try:
return response.json()
except json.JSONDecodeError:
raise errors.PluginError(f"deSEC API sent non-JSON response (status {response.status_code}): "
f"{response.content}")
|