File: certbot_dns_netcup.py

package info (click to toggle)
python-certbot-dns-netcup 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 124 kB
  • sloc: python: 198; makefile: 3
file content (278 lines) | stat: -rw-r--r-- 9,803 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
This module defines a certbot plugin to automate the process of completing a
``dns-01`` challenge (`~acme.challenges.DNS01`) by creating, and subsequently
removing, TXT records using the netcup CCP API.
"""

# Keep metadata before any imports (for setup.py)!
__version__ = '2.0.0'
__url__     = 'https://github.com/coldfix/certbot-dns-netcup'
__all__     = ['Authenticator']

import json
import logging
import requests

from certbot.plugins import dns_common
from certbot.errors import PluginError


API_URL = 'https://www.netcup-wiki.de/wiki/CCP_API'
API_ENDPOINT = "https://ccp.netcup.net/run/webservice/servers/endpoint.php?JSON"

LOGGER = logging.getLogger(__name__)


class Authenticator(dns_common.DNSAuthenticator):
    """DNS Authenticator for netcup

    This Authenticator uses the netcup API to fulfill a dns-01 challenge.
    """

    description = ('Obtain certificates using a DNS TXT record (if you are '
                   'using netcup for DNS).')

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.credentials = None
        self.api_session_id = None

    # DNSAuthenticator overrides

    @classmethod
    def add_parser_arguments(cls, add):
        super().add_parser_arguments(add, default_propagation_seconds=900)
        add('credentials', help='netcup credentials INI file.')
        add('login-retries', default=3, type=int,
            help="login retry attempts in case of session timeout")
        add('zone-name', help=(
            "zone name to operate on (often TLD). "
            "Will be determined using brute-force requests if not specified."))

    def more_info(self):
        return ('This plugin configures a DNS TXT record to respond to a '
                'dns-01 challenge using the netcup API.')

    def _setup_credentials(self):
        self.credentials = self._configure_credentials(
            'credentials',
            'netcup credentials INI file',
            {
                'customer-id':  'Customer ID associated with netcup account',
                'api-key':      'Key for CCP API, see {}'.format(API_URL),
                'api-password': 'Password for CCP API, see {}'.format(API_URL),
            }
        )

    def _perform(self, domain, validation_name, validation):
        LOGGER.debug("add_dns_record (%s, %s)", domain, validation_name)
        client = self._get_netcup_client()
        self._determine_zone(domain, lambda domain: (
            client.add_dns_record(domain, validation_name, validation)))

    def _cleanup(self, domain, validation_name, validation):
        LOGGER.debug("delete_dns_record(%s, %s)", domain, validation_name)
        client = self._get_netcup_client()
        self._determine_zone(domain, lambda domain: (
            client.delete_dns_record(domain, validation_name, validation)))

    def _determine_zone(self, domain, func):
        """Find the domain_id for a given domain."""
        zone_name = self.conf('zone-name')
        if zone_name:
            return func(zone_name)

        domain_name_guesses = dns_common.base_domain_name_guesses(domain)

        for domain_name in domain_name_guesses:
            try:
                return func(domain_name)
            except NetcupZoneError:
                pass

        raise PluginError(
            'Unable to determine zone identifier for {0} using zone names: {1}'
            .format(domain, domain_name_guesses))

    def _get_netcup_client(self):
        credentials = self.credentials.conf
        return APIClient(
            credentials('customer-id'),
            credentials('api-key'),
            credentials('api-password'),
            self.conf('login-retries'))


class APIClient:

    def __init__(self, customer_id, api_key, api_password, login_retries):
        self._customer_id = customer_id
        self._api_key = api_key
        self._api_password = api_password
        self._api_session_id = None
        self._login_retries = login_retries
        if login_retries < 0:
            raise PluginError((
                "Invalid value for --dns-netcup-login-retries: {}. "
                "Must be >= 0."
            ).format(login_retries))

    # public interface

    def add_dns_record(self, domain, name, content):
        """Create record."""
        record = _make_record(domain, name, content)
        self._update_records(domain, [record])

    def delete_dns_record(self, domain, name, content):
        """Delete an existing record. If record does not exist, do nothing."""
        # Note that id/type/hostname/destination are all mandatory when
        # deleting. We must query first to retrieve the record id.
        record = _make_record(domain, name, content)
        queried_records = self._query_records(domain, record)
        self._update_records(domain, [
            dict(record_with_id, deleterecord=True)
            for record_with_id in queried_records
        ])

    # internal helpers

    def _query_records(self, domain, record):
        """Query list of record using netcup API."""
        responsedata = self._authenticate_and_call(
            "infoDnsRecords",
            domainname=domain)
        dnsrecords = responsedata.get("dnsrecords", [])
        return [
            retrieved_record
            for retrieved_record in dnsrecords
            if all(retrieved_record[k] == record[k] for k in record)
        ]

    def _update_records(self, domain, records):
        """Insert or update a list of DNS records.

        The fields ``hostname``, ``type``, and ``destination`` are mandatory.
        The field ``id`` is mandatory when ``deleterecord=True``
        """
        return self._authenticate_and_call(
            "updateDnsRecords",
            domainname=domain,
            dnsrecordset={"dnsrecords": records})

    def _authenticate(self, force=False):
        """Authenticate with netcup server. Must be called first."""

        if force or not self._api_session_id:
            responsedata = _apicall("login", {
                "customernumber": self._customer_id,
                "apikey": self._api_key,
                "apipassword": self._api_password,
            })

            self._api_session_id = responsedata.get("apisessionid")
            if not self._api_session_id:
                raise PluginError("Login failed due to unknown reason.")

        return {
            "customernumber": self._customer_id,
            "apikey": self._api_key,
            "apisessionid": self._api_session_id,
        }

    def _authenticate_and_call(self, action, domainname, **param):
        """Authenticate and then perform API call.
        Auto retry login if session timed out."""
        param = dict(param, domainname=domainname)
        session_auth = self._authenticate(False)

        for i in range(self._login_retries):
            try:
                return _apicall(action, session_auth, **param)
            except NetcupSessionTimeoutError:
                LOGGER.warning(
                    "Login session timed out during call %s for domain %s. "
                    "Retrying login (attempt %d)",
                    action, domainname, i + 1)
                session_auth = self._authenticate(True)

        return _apicall(action, session_auth, **param)


def _make_record(domain, name, content):
    name = name.removesuffix('.' + domain)
    return {
        "type": "TXT",
        "hostname": name,
        "destination": content,
    }


def _apicall(action, credentials, **param):
    """Call an API method and return response data. For more info, see:
    https://ccp.netcup.net/run/webservice/servers/endpoint"""

    LOGGER.debug("_apicall: %s(%s)", action, param.get('domainname', ''))

    data = {
        "action": action,
        "param": dict(param, **credentials),
    }
    response = requests.post(
        API_ENDPOINT,
        data=json.dumps(data),
        headers={
            "Accept": "application/json",
            "Content-Type": "application/json",
        })
    response.raise_for_status()
    data = response.json()

    status = data.get("status")
    statuscode = data.get("statuscode")
    shortmessage = data.get("shortmessage")
    longmessage = data.get("longmessage")

    if status == "success":     # statuscode == 2000
        return data.get("responsedata", {})

    # This happens when the domain is not chosen properly. Full message is:
    # "Value in field domainname does not match requirements of type: domainname"
    if 'Value in field domainname does not match requirements' in longmessage:
        raise NetcupZoneError(
            action, statuscode, shortmessage, longmessage)

    # The session times out after roughly 30s and fails with this error:
    if 'The session id is not in a valid format.' in longmessage:
        raise NetcupSessionTimeoutError(
            action, statuscode, shortmessage, longmessage)

    # When something with the JSON format was incorrect, there is also:
    # "Api key missing. JSON decode failed while validating request."

    raise NetcupError(
        action, statuscode, shortmessage, longmessage)


class NetcupError(PluginError):

    def __init__(self, action, statuscode, shortmessage, longmessage):
        super().__init__(self._format(
            action, statuscode, shortmessage, longmessage))
        self.action = action
        self.statuscode = statuscode
        self.shortmessage = shortmessage
        self.longmessage = longmessage

    @classmethod
    def _format(cls, action, statuscode, shortmessage, longmessage):
        return (f'{cls.__name__} during {action}: '
                f'{shortmessage} ({statuscode}) {longmessage}')


class NetcupZoneError(NetcupError):
    pass


class NetcupSessionTimeoutError(NetcupError):
    pass