
|
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Module to interact with systemd-resolved over DBus."""
import ipaddress
import json
import subprocess
from django.utils.translation import gettext_lazy as _
from plinth.utils import import_from_gi
gio = import_from_gi('Gio', '2.0')
RESOLVE_NAME = 'org.freedesktop.resolve1'
RESOLVE_PATH = '/org/freedesktop/resolve1'
MANAGER_INTERFACE = 'org.freedesktop.resolve1.Manager'
LINK_INTERFACE = 'org.freedesktop.resolve1.Link'
class DNSServer:
"""Representation of a DNS server in systemd-resolved state."""
def __init__(self, link_index: int, address_class: int,
address_ints: list[int], port: int = 0,
domain_name: str | None = None):
self.link_index = link_index
self.address_class = address_class
self.address = ipaddress.ip_address(bytes(address_ints))
self.port = port
self.domain_name = domain_name
def __str__(self):
if self.port:
if self.address.version == 4: # IPv4
address_str = f'{self.address.compressed}:{self.port}'
else: # IPv6
address_str = f'[{self.address.compressed}]:{self.port}'
else:
address_str = self.address.compressed
if self.domain_name:
return f'{address_str} ({self.domain_name})'
return address_str
class Link:
"""systemd-resolved state for a particular link or global context."""
def __init__(self, connection, object_path, link_index: int = 0,
interface_name: str | None = None):
"""Fetch all the relevant properties for a link over DBus."""
if not link_index: # Global
interface = MANAGER_INTERFACE
else:
interface = LINK_INTERFACE
self.proxy = gio.DBusProxy.new_sync(connection,
gio.DBusProxyFlags.NONE, None,
RESOLVE_NAME, object_path,
interface)
self.link_index = link_index
self.interface_name = interface_name
self.dns_over_tls = self.proxy.get_cached_property(
'DNSOverTLS').unpack()
self.dnssec = self.proxy.get_cached_property('DNSSEC').unpack()
self.dnssec_supported = self.proxy.get_cached_property(
'DNSSECSupported')
self.dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('DNSEx'))
self.fallback_dns_servers = None
if not link_index:
self.fallback_dns_servers = self._new_dns_servers(
self.proxy.get_cached_property('FallbackDNSEx'))
self.current_dns_server = self._new_dns_server(
self.proxy.get_cached_property('CurrentDNSServerEx'))
def get_link(self, link_index):
"""Return a string path to a link's DBus object."""
return self.proxy.GetLink('(i)', link_index)
@property
def dns_over_tls_string(self):
"""Return a string representation for DNS-over-TLS status."""
value_map = {
'yes': _('yes'),
'opportunistic': _('opportunistic'),
'no': _('no')
}
return value_map.get(self.dns_over_tls, self.dns_over_tls)
@property
def dnssec_string(self):
"""Return a string representation for DNSSEC status."""
value_map = {
'yes': _('yes'),
'allow-downgrade': _('allow-downgrade'),
'no': _('no')
}
return value_map.get(self.dnssec, self.dnssec)
@property
def dnssec_supported_string(self):
"""Return a string representation for whether DNSSEC is supported."""
return _('supported') if self.dnssec_supported else _('unsupported')
def _new_dns_servers(self, dns_tuples):
"""Return list of DNS Server objects from variant tuple.
Global DNS servers list also contains individual link DNS servers.
Ignore those.
"""
return [
self._new_dns_server(dns_tuple) for dns_tuple in dns_tuples
if self.link_index != 0 or dns_tuple[0] == 0
]
def _new_dns_server(self, dns_tuple):
"""Return a DNS Server object from variant tuple.
Tuple can be prefixed by link index in case of DNS server for global
context. Handle both cases. Entire tuple may be empty. Return None in
that case.
"""
if self.link_index: # Not global
dns_tuple = (self.link_index, ) + tuple(dns_tuple)
if not dns_tuple[2]: # Empty address
return None
return DNSServer(*dns_tuple)
def __str__(self):
dnssec_supported = ('supported'
if self.dnssec_supported else 'unspported')
value = ''
if not self.link_index:
value += 'Global\n'
else:
value = f'Link {self.link_index} ({self.interface_name})\n'
if self.current_dns_server:
value += f' Current DNS Server: {str(self.current_dns_server)}\n'
if self.dns_servers:
value += ' DNS Servers:\n'
for server in self.dns_servers:
value += f' {server}\n'
if self.fallback_dns_servers:
value += ' Fallback DNS Servers: \n'
for server in self.fallback_dns_servers:
value += f' {server}\n'
value += f' DNS-over-TLS: {self.dns_over_tls}\n'
value += f' DNSSEC: {self.dnssec}/{dnssec_supported}\n'
return value
def get_links():
"""Return the list of network interfaces and their indices."""
process = subprocess.run(['ip', '--json', 'link'], stdout=subprocess.PIPE,
check=True)
output = json.loads(process.stdout.decode())
links = {} # Maintain link index order
for entry in output:
links[entry['ifindex']] = entry['ifname']
return links
def get_status():
"""Return the current status of systemd-resolved daemon."""
link_status = []
connection = gio.bus_get_sync(gio.BusType.SYSTEM)
global_link = Link(connection, RESOLVE_PATH)
link_status.append(global_link)
links = get_links()
for link_index, interface_name in links.items():
if interface_name == 'lo':
continue
link_path = global_link.get_link(link_index)
link_status.append(
Link(connection, link_path, link_index, interface_name))
return link_status
|