File: dns_standalone.py

package info (click to toggle)
python-certbot-dns-standalone 1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 180 kB
  • sloc: python: 207; makefile: 14
file content (115 lines) | stat: -rw-r--r-- 3,836 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Standalone DNS Authenticator."""
import logging

import copy

from dnslib import RR
from dnslib.server import DNSServer,DNSHandler,BaseResolver,DNSLogger,UDPServer,TCPServer

from socket import AF_INET6,SOCK_DGRAM

from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common

logger = logging.getLogger(__name__)
dnsLogger = DNSLogger("truncated,error",False)


class Authenticator(dns_common.DNSAuthenticator):
    """Standalone DNS Authenticator

    This Authenticator uses a standalone DNS server to fulfill a dns-01 challenge.
    """

    description = ('Obtain certificates using an integrated DNS server')

    def __init__(self, *args, **kwargs):
        super(Authenticator, self).__init__(*args, **kwargs)
        self.resolver = None
        self.servers = None

    @classmethod
    def add_parser_arguments(cls, add):  # pylint: disable=arguments-differ
        super(Authenticator, cls).add_parser_arguments(add, default_propagation_seconds=0)
        add('address', help='IP address to bind to.', default='0.0.0.0')
        add('ipv6-address', help='IPv6 address to bind to.', default='::')
        add('port', help='Port to bind to.', default='53')

    def _setup_credentials(self):
        return

    def more_info(self):  # pylint: disable=missing-docstring,no-self-use
        return 'This plugin uses a standalone DNS server to respond to a dns-01 challenge.'

    def _perform(self, domain, validation_name, validation):
        if self.resolver is None:
            self.resolver = _AcmeResolver()

        self.resolver.addToken(validation)

        if self.servers is None:
            self.servers = []
            active_udp_server = False
            error = None

            for Server in [TCP6Server, TCPServer, UDP6Server, UDPServer]:
                # Try IPv6 version first since it may listen on IPv4 as well.
                try:
                    if Server.address_family == AF_INET6:
                        address = self.conf('ipv6-address')
                    else:
                        address = self.conf('address')
                    if address is not None:
                        server = DNSServer(self.resolver, port=int(self.conf('port')), address=address,
                                           server=Server, logger=dnsLogger)
                        server.start_thread()
                        self.servers.append(server)
                        if Server.socket_type == SOCK_DGRAM:
                            active_udp_server = True
                except Exception as e:
                    error = e

            if not active_udp_server:
                # Re-raise the exception when no UDP server was started successfully.
                raise errors.PluginError('Error starting DNS server: {0}'.format(error))

    def _cleanup(self, domain, validation_name, validation):
        if self.servers:
            for server in self.servers:
                server.stop()


class _AcmeResolver(BaseResolver):
    def __init__(self):
        self.tokens = []

    def addToken(self,validation):
        self.tokens.append(validation)

    def resolve(self,request,handler):
        reply = request.reply()
        qname = request.q.qname

        if request.q.qtype == 16:
            records = ""
            for r in self.tokens:
                records += ". 60 TXT %s\n" % r

            resp = RR.fromZone(records)
        else:
            resp = RR.fromZone(". 60 A 127.0.0.1") # for dig

        if request.q.qtype == 1 or request.q.qtype == 16:
            for rr in resp:
                a = copy.copy(rr)
                a.rname = qname
                reply.add_answer(a)

        return reply

class UDP6Server(UDPServer):
    address_family = AF_INET6

class TCP6Server(TCPServer):
    address_family = AF_INET6