1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
"""
This module defines a certbot plugin to automate the process of completing a
``dns-01`` challenge (`~acme.challenges.DNS01`) by creating, and subsequently
removing, TXT records using the netcup CCP API.
"""
# Keep metadata before any imports (for setup.py)!
__version__ = '2.0.0'
__url__ = 'https://github.com/coldfix/certbot-dns-netcup'
__all__ = ['Authenticator']
import json
import logging
import requests
from certbot.plugins import dns_common
from certbot.errors import PluginError
API_URL = 'https://www.netcup-wiki.de/wiki/CCP_API'
API_ENDPOINT = "https://ccp.netcup.net/run/webservice/servers/endpoint.php?JSON"
LOGGER = logging.getLogger(__name__)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for netcup
This Authenticator uses the netcup API to fulfill a dns-01 challenge.
"""
description = ('Obtain certificates using a DNS TXT record (if you are '
'using netcup for DNS).')
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
self.api_session_id = None
# DNSAuthenticator overrides
@classmethod
def add_parser_arguments(cls, add):
super().add_parser_arguments(add, default_propagation_seconds=900)
add('credentials', help='netcup credentials INI file.')
add('login-retries', default=3, type=int,
help="login retry attempts in case of session timeout")
add('zone-name', help=(
"zone name to operate on (often TLD). "
"Will be determined using brute-force requests if not specified."))
def more_info(self):
return ('This plugin configures a DNS TXT record to respond to a '
'dns-01 challenge using the netcup API.')
def _setup_credentials(self):
self.credentials = self._configure_credentials(
'credentials',
'netcup credentials INI file',
{
'customer-id': 'Customer ID associated with netcup account',
'api-key': 'Key for CCP API, see {}'.format(API_URL),
'api-password': 'Password for CCP API, see {}'.format(API_URL),
}
)
def _perform(self, domain, validation_name, validation):
LOGGER.debug("add_dns_record (%s, %s)", domain, validation_name)
client = self._get_netcup_client()
self._determine_zone(domain, lambda domain: (
client.add_dns_record(domain, validation_name, validation)))
def _cleanup(self, domain, validation_name, validation):
LOGGER.debug("delete_dns_record(%s, %s)", domain, validation_name)
client = self._get_netcup_client()
self._determine_zone(domain, lambda domain: (
client.delete_dns_record(domain, validation_name, validation)))
def _determine_zone(self, domain, func):
"""Find the domain_id for a given domain."""
zone_name = self.conf('zone-name')
if zone_name:
return func(zone_name)
domain_name_guesses = dns_common.base_domain_name_guesses(domain)
for domain_name in domain_name_guesses:
try:
return func(domain_name)
except NetcupZoneError:
pass
raise PluginError(
'Unable to determine zone identifier for {0} using zone names: {1}'
.format(domain, domain_name_guesses))
def _get_netcup_client(self):
credentials = self.credentials.conf
return APIClient(
credentials('customer-id'),
credentials('api-key'),
credentials('api-password'),
self.conf('login-retries'))
class APIClient:
def __init__(self, customer_id, api_key, api_password, login_retries):
self._customer_id = customer_id
self._api_key = api_key
self._api_password = api_password
self._api_session_id = None
self._login_retries = login_retries
if login_retries < 0:
raise PluginError((
"Invalid value for --dns-netcup-login-retries: {}. "
"Must be >= 0."
).format(login_retries))
# public interface
def add_dns_record(self, domain, name, content):
"""Create record."""
record = _make_record(domain, name, content)
self._update_records(domain, [record])
def delete_dns_record(self, domain, name, content):
"""Delete an existing record. If record does not exist, do nothing."""
# Note that id/type/hostname/destination are all mandatory when
# deleting. We must query first to retrieve the record id.
record = _make_record(domain, name, content)
queried_records = self._query_records(domain, record)
self._update_records(domain, [
dict(record_with_id, deleterecord=True)
for record_with_id in queried_records
])
# internal helpers
def _query_records(self, domain, record):
"""Query list of record using netcup API."""
responsedata = self._authenticate_and_call(
"infoDnsRecords",
domainname=domain)
dnsrecords = responsedata.get("dnsrecords", [])
return [
retrieved_record
for retrieved_record in dnsrecords
if all(retrieved_record[k] == record[k] for k in record)
]
def _update_records(self, domain, records):
"""Insert or update a list of DNS records.
The fields ``hostname``, ``type``, and ``destination`` are mandatory.
The field ``id`` is mandatory when ``deleterecord=True``
"""
return self._authenticate_and_call(
"updateDnsRecords",
domainname=domain,
dnsrecordset={"dnsrecords": records})
def _authenticate(self, force=False):
"""Authenticate with netcup server. Must be called first."""
if force or not self._api_session_id:
responsedata = _apicall("login", {
"customernumber": self._customer_id,
"apikey": self._api_key,
"apipassword": self._api_password,
})
self._api_session_id = responsedata.get("apisessionid")
if not self._api_session_id:
raise PluginError("Login failed due to unknown reason.")
return {
"customernumber": self._customer_id,
"apikey": self._api_key,
"apisessionid": self._api_session_id,
}
def _authenticate_and_call(self, action, domainname, **param):
"""Authenticate and then perform API call.
Auto retry login if session timed out."""
param = dict(param, domainname=domainname)
session_auth = self._authenticate(False)
for i in range(self._login_retries):
try:
return _apicall(action, session_auth, **param)
except NetcupSessionTimeoutError:
LOGGER.warning(
"Login session timed out during call %s for domain %s. "
"Retrying login (attempt %d)",
action, domainname, i + 1)
session_auth = self._authenticate(True)
return _apicall(action, session_auth, **param)
def _make_record(domain, name, content):
name = name.removesuffix('.' + domain)
return {
"type": "TXT",
"hostname": name,
"destination": content,
}
def _apicall(action, credentials, **param):
"""Call an API method and return response data. For more info, see:
https://ccp.netcup.net/run/webservice/servers/endpoint"""
LOGGER.debug("_apicall: %s(%s)", action, param.get('domainname', ''))
data = {
"action": action,
"param": dict(param, **credentials),
}
response = requests.post(
API_ENDPOINT,
data=json.dumps(data),
headers={
"Accept": "application/json",
"Content-Type": "application/json",
})
response.raise_for_status()
data = response.json()
status = data.get("status")
statuscode = data.get("statuscode")
shortmessage = data.get("shortmessage")
longmessage = data.get("longmessage")
if status == "success": # statuscode == 2000
return data.get("responsedata", {})
# This happens when the domain is not chosen properly. Full message is:
# "Value in field domainname does not match requirements of type: domainname"
if 'Value in field domainname does not match requirements' in longmessage:
raise NetcupZoneError(
action, statuscode, shortmessage, longmessage)
# The session times out after roughly 30s and fails with this error:
if 'The session id is not in a valid format.' in longmessage:
raise NetcupSessionTimeoutError(
action, statuscode, shortmessage, longmessage)
# When something with the JSON format was incorrect, there is also:
# "Api key missing. JSON decode failed while validating request."
raise NetcupError(
action, statuscode, shortmessage, longmessage)
class NetcupError(PluginError):
def __init__(self, action, statuscode, shortmessage, longmessage):
super().__init__(self._format(
action, statuscode, shortmessage, longmessage))
self.action = action
self.statuscode = statuscode
self.shortmessage = shortmessage
self.longmessage = longmessage
@classmethod
def _format(cls, action, statuscode, shortmessage, longmessage):
return (f'{cls.__name__} during {action}: '
f'{shortmessage} ({statuscode}) {longmessage}')
class NetcupZoneError(NetcupError):
pass
class NetcupSessionTimeoutError(NetcupError):
pass
|