1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Module to interact with systemd-resolved over DBus."""
import ipaddress
import json
import subprocess
from django.utils.translation import gettext_lazy as _
from plinth.utils import import_from_gi
gio = import_from_gi('Gio', '2.0')
RESOLVE_NAME = 'org.freedesktop.resolve1'
RESOLVE_PATH = '/org/freedesktop/resolve1'
MANAGER_INTERFACE = 'org.freedesktop.resolve1.Manager'
LINK_INTERFACE = 'org.freedesktop.resolve1.Link'
class DNSServer:
"""Representation of a DNS server in systemd-resolved state."""
def __init__(self, link_index: int, address_class: int,
address_ints: list[int], port: int = 0,
domain_name: str | None = None):
self.link_index = link_index
self.address_class = address_class
self.address = ipaddress.ip_address(bytes(address_ints))
self.port = port
self.domain_name = domain_name
def __str__(self):
if self.port:
if self.address.version == 4: # IPv4
address_str = f'{self.address.compressed}:{self.port}'
else: # IPv6
address_str = f'[{self.address.compressed}]:{self.port}'
else:
address_str = self.address.compressed
if self.domain_name:
return f'{address_str} ({self.domain_name})'
return address_str
class Link:
"""systemd-resolved state for a particular link or global context."""
def __init__(self, connection, object_path, link_index: int = 0,
interface_name: str | None = None):
"""Fetch all the relevant properties for a link over DBus."""
if not link_index: # Global
interface = MANAGER_INTERFACE
else:
interface = LINK_INTERFACE
self.proxy = gio.DBusProxy.new_sync(connection,
gio.DBusProxyFlags.NONE, None,
RESOLVE_NAME, object_path,
interface)
self.link_index = link_index
self.interface_name = interface_name
self.dns_over_tls = self.proxy.get_cached_property(
'DNSOverTLS').unpack()
self.dnssec = self.proxy.get_cached_property('DNSSEC').unpack()
self.dnssec_supported = self.proxy.get_cached_property(
'DNSSECSupported')
self.dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('DNSEx'))
self.fallback_dns_servers = None
if not link_index:
self.fallback_dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('FallbackDNSEx'))
self.current_dns_server = self._new_dns_server(
self.proxy.get_cached_property('CurrentDNSServerEx'))
def get_link(self, link_index):
"""Return a string path to a link's DBus object."""
return self.proxy.GetLink('(i)', link_index)
@property
def dns_over_tls_string(self):
"""Return a string representation for DNS-over-TLS status."""
value_map = {
'yes': _('yes'),
'opportunistic': _('opportunistic'),
'no': _('no')
}
return value_map.get(self.dns_over_tls, self.dns_over_tls)
@property
def dnssec_string(self):
"""Return a string representation for DNSSEC status."""
value_map = {
'yes': _('yes'),
'allow-downgrade': _('allow-downgrade'),
'no': _('no')
}
return value_map.get(self.dnssec, self.dnssec)
@property
def dnssec_supported_string(self):
"""Return a string representation for whether DNSSEC is supported."""
return _('supported') if self.dnssec_supported else _('unsupported')
def _new_dns_servers(self, dns_tuples):
"""Return list of DNS Server objects from variant tuple.
Global DNS servers list also contains individual link DNS servers.
Ignore those.
"""
return [
self._new_dns_server(dns_tuple) for dns_tuple in dns_tuples
if self.link_index != 0 or dns_tuple[0] == 0
]
def _new_dns_server(self, dns_tuple):
"""Return a DNS Server object from variant tuple.
Tuple can be prefixed by link index in case of DNS server for global
context. Handle both cases. Entire tuple may be empty. Return None in
that case.
"""
if self.link_index: # Not global
dns_tuple = (self.link_index, ) + tuple(dns_tuple)
if not dns_tuple[2]: # Empty address
return None
return DNSServer(*dns_tuple)
def __str__(self):
dnssec_supported = ('supported'
if self.dnssec_supported else 'unspported')
value = ''
if not self.link_index:
value += 'Global\n'
else:
value = f'Link {self.link_index} ({self.interface_name})\n'
if self.current_dns_server:
value += f' Current DNS Server: {str(self.current_dns_server)}\n'
if self.dns_servers:
value += ' DNS Servers:\n'
for server in self.dns_servers:
value += f' {server}\n'
if self.fallback_dns_servers:
value += ' Fallback DNS Servers: \n'
for server in self.fallback_dns_servers:
value += f' {server}\n'
value += f' DNS-over-TLS: {self.dns_over_tls}\n'
value += f' DNSSEC: {self.dnssec}/{dnssec_supported}\n'
return value
def get_links():
"""Return the list of network interfaces and their indices."""
process = subprocess.run(['ip', '--json', 'link'], stdout=subprocess.PIPE,
check=True)
output = json.loads(process.stdout.decode())
links = {} # Maintain link index order
for entry in output:
links[entry['ifindex']] = entry['ifname']
return links
def get_status():
"""Return the current status of systemd-resolved daemon."""
link_status = []
connection = gio.bus_get_sync(gio.BusType.SYSTEM)
global_link = Link(connection, RESOLVE_PATH)
link_status.append(global_link)
links = get_links()
for link_index, interface_name in links.items():
if interface_name == 'lo':
continue
link_path = global_link.get_link(link_index)
link_status.append(
Link(connection, link_path, link_index, interface_name))
return link_status
|