File: resolved.py

package info (click to toggle)
freedombox 26.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 83,092 kB
  • sloc: python: 48,542; javascript: 1,730; xml: 481; makefile: 290; sh: 137; php: 32
file content (194 lines) | stat: -rw-r--r-- 6,541 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Module to interact with systemd-resolved over DBus."""

import ipaddress
import json
import subprocess

from django.utils.translation import gettext_lazy as _

from plinth.utils import import_from_gi

gio = import_from_gi('Gio', '2.0')

RESOLVE_NAME = 'org.freedesktop.resolve1'
RESOLVE_PATH = '/org/freedesktop/resolve1'
MANAGER_INTERFACE = 'org.freedesktop.resolve1.Manager'
LINK_INTERFACE = 'org.freedesktop.resolve1.Link'


class DNSServer:
    """Representation of a DNS server in systemd-resolved state."""

    def __init__(self, link_index: int, address_class: int,
                 address_ints: list[int], port: int = 0,
                 domain_name: str | None = None):
        self.link_index = link_index
        self.address_class = address_class
        self.address = ipaddress.ip_address(bytes(address_ints))
        self.port = port
        self.domain_name = domain_name

    def __str__(self):
        if self.port:
            if self.address.version == 4:  # IPv4
                address_str = f'{self.address.compressed}:{self.port}'
            else:  # IPv6
                address_str = f'[{self.address.compressed}]:{self.port}'
        else:
            address_str = self.address.compressed

        if self.domain_name:
            return f'{address_str} ({self.domain_name})'

        return address_str


class Link:
    """systemd-resolved state for a particular link or global context."""

    def __init__(self, connection, object_path, link_index: int = 0,
                 interface_name: str | None = None):
        """Fetch all the relevant properties for a link over DBus."""
        if not link_index:  # Global
            interface = MANAGER_INTERFACE
        else:
            interface = LINK_INTERFACE

        self.proxy = gio.DBusProxy.new_sync(connection,
                                            gio.DBusProxyFlags.NONE, None,
                                            RESOLVE_NAME, object_path,
                                            interface)

        self.link_index = link_index
        self.interface_name = interface_name

        self.dns_over_tls = self.proxy.get_cached_property(
            'DNSOverTLS').unpack()
        self.dnssec = self.proxy.get_cached_property('DNSSEC').unpack()
        self.dnssec_supported = self.proxy.get_cached_property(
            'DNSSECSupported')

        self.dns_servers = self._new_dns_servers(
            self.proxy.get_cached_property('DNSEx'))

        self.fallback_dns_servers = None
        if not link_index:
            self.fallback_dns_servers = self._new_dns_servers(
                self.proxy.get_cached_property('FallbackDNSEx'))

        self.current_dns_server = self._new_dns_server(
            self.proxy.get_cached_property('CurrentDNSServerEx'))

    def get_link(self, link_index):
        """Return a string path to a link's DBus object."""
        return self.proxy.GetLink('(i)', link_index)

    @property
    def dns_over_tls_string(self):
        """Return a string representation for DNS-over-TLS status."""
        value_map = {
            'yes': _('yes'),
            'opportunistic': _('opportunistic'),
            'no': _('no')
        }
        return value_map.get(self.dns_over_tls, self.dns_over_tls)

    @property
    def dnssec_string(self):
        """Return a string representation for DNSSEC status."""
        value_map = {
            'yes': _('yes'),
            'allow-downgrade': _('allow-downgrade'),
            'no': _('no')
        }
        return value_map.get(self.dnssec, self.dnssec)

    @property
    def dnssec_supported_string(self):
        """Return a string representation for whether DNSSEC is supported."""
        return _('supported') if self.dnssec_supported else _('unsupported')

    def _new_dns_servers(self, dns_tuples):
        """Return list of DNS Server objects from variant tuple.

        Global DNS servers list also contains individual link DNS servers.
        Ignore those.
        """
        return [
            self._new_dns_server(dns_tuple) for dns_tuple in dns_tuples
            if self.link_index != 0 or dns_tuple[0] == 0
        ]

    def _new_dns_server(self, dns_tuple):
        """Return a DNS Server object from variant tuple.

        Tuple can be prefixed by link index in case of DNS server for global
        context. Handle both cases. Entire tuple may be empty. Return None in
        that case.
        """
        if self.link_index:  # Not global
            dns_tuple = (self.link_index, ) + tuple(dns_tuple)

        if not dns_tuple[2]:  # Empty address
            return None

        return DNSServer(*dns_tuple)

    def __str__(self):
        dnssec_supported = ('supported'
                            if self.dnssec_supported else 'unspported')
        value = ''
        if not self.link_index:
            value += 'Global\n'
        else:
            value = f'Link {self.link_index} ({self.interface_name})\n'

        if self.current_dns_server:
            value += f'  Current DNS Server: {str(self.current_dns_server)}\n'

        if self.dns_servers:
            value += '  DNS Servers:\n'
            for server in self.dns_servers:
                value += f'    {server}\n'

        if self.fallback_dns_servers:
            value += '  Fallback DNS Servers: \n'
            for server in self.fallback_dns_servers:
                value += f'    {server}\n'

        value += f'  DNS-over-TLS: {self.dns_over_tls}\n'
        value += f'  DNSSEC: {self.dnssec}/{dnssec_supported}\n'
        return value


def get_links():
    """Return the list of network interfaces and their indices."""
    process = subprocess.run(['ip', '--json', 'link'], stdout=subprocess.PIPE,
                             check=True)
    output = json.loads(process.stdout.decode())

    links = {}  # Maintain link index order
    for entry in output:
        links[entry['ifindex']] = entry['ifname']

    return links


def get_status():
    """Return the current status of systemd-resolved daemon."""
    link_status = []
    connection = gio.bus_get_sync(gio.BusType.SYSTEM)
    global_link = Link(connection, RESOLVE_PATH)
    link_status.append(global_link)

    links = get_links()
    for link_index, interface_name in links.items():
        if interface_name == 'lo':
            continue

        link_path = global_link.get_link(link_index)
        link_status.append(
            Link(connection, link_path, link_index, interface_name))

    return link_status