File: utils.py

package info (click to toggle)
freedombox 26.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 82,976 kB
  • sloc: python: 48,504; javascript: 1,736; xml: 481; makefile: 290; sh: 167; php: 32
file content (284 lines) | stat: -rw-r--r-- 9,951 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Utilities for managing WireGuard."""

import datetime
import logging
import subprocess
import time

from plinth import app as app_module
from plinth import network
from plinth.utils import import_from_gi

from . import privileged

nm = import_from_gi('NM', '1.0')

IP_TEMPLATE = '10.84.0.{}'

logger = logging.getLogger(__name__)


def get_nm_info():
    """Get information from network manager."""
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    client = network.get_nm_client()

    connections = {}
    for connection in client.get_connections():
        if connection.get_connection_type() != setting_name:
            continue

        settings = connection.get_setting_by_name(setting_name)
        secrets = connection.get_secrets(setting_name)
        connection.update_secrets(setting_name, secrets)

        info = {}
        info['interface'] = connection.get_interface_name()
        info['private_key'] = settings.get_private_key()
        info['public_key'] = None
        info['listen_port'] = settings.get_listen_port()
        info['fwmark'] = settings.get_fwmark()
        info['mtu'] = settings.get_mtu()
        info['default_route'] = settings.get_ip4_auto_default_route()
        info['peers'] = {}
        for peer_index in range(settings.get_peers_len()):
            peer = settings.get_peer(peer_index)
            peer_info = {
                'endpoint': peer.get_endpoint(),
                'public_key': peer.get_public_key(),
                'preshared_key': peer.get_preshared_key(),
                'persistent_keepalive': peer.get_persistent_keepalive(),
                'allowed_ips': []
            }
            for index in range(peer.get_allowed_ips_len()):
                allowed_ip = peer.get_allowed_ip(index, None)
                peer_info['allowed_ips'].append(allowed_ip)

            info['peers'][peer_info['public_key']] = peer_info

        settings_ipv4 = connection.get_setting_ip4_config()
        if settings_ipv4 and settings_ipv4.get_num_addresses():
            info['ip_address'] = settings_ipv4.get_address(0).get_address()

        connections[info['interface']] = info

    return connections


def get_info():
    """Return server and clients info."""
    status = privileged.get_info()

    nm_info = get_nm_info()

    my_server_info = None
    my_client_servers = {}
    for interface, info in nm_info.items():
        if interface == 'wg0':
            my_server_info = info
        else:
            my_client_servers[interface] = info

        # If the NM connection is not active but the device link is up, 'wg
        # show' will not show any public key configured on the interface.
        if interface not in status or (interface in status and
                                       not status[interface]['public_key']):
            info['public_key'] = _get_public_key_from_private_key(
                info['private_key'])
            continue

        info['public_key'] = status[interface]['public_key']
        for status_peer in status[interface]['peers']:
            if status_peer['latest_handshake']:
                status_peer['latest_handshake'] = \
                    datetime.datetime.fromtimestamp(
                        status_peer['latest_handshake'])
            public_key = status_peer['public_key']
            info_peer = info['peers'].setdefault(public_key, {})
            info_peer['status'] = status_peer

    return {
        'my_server': my_server_info,
        'my_client': {
            'servers': my_client_servers,
        },
    }


def enable_connections(enable):
    """Activate all connections and set them to auto-connect."""
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    client = network.get_nm_client()
    for connection in client.get_connections():
        if connection.get_connection_type() != setting_name:
            continue

        network.edit_connection(connection,
                                {'common': {
                                    'autoconnect': enable
                                }})
        if enable:
            network.activate_connection(connection.get_uuid())
        else:
            try:
                network.deactivate_connection(connection.get_uuid())
            except network.ConnectionNotFound:
                pass  # Connection is already inactive


def _get_public_key_from_private_key(private_key):
    process = subprocess.run(['wg', 'pubkey'], check=True, capture_output=True,
                             input=private_key.encode())
    return process.stdout.decode()


def _generate_private_key():
    """Return a private key generated by 'wg' command."""
    process = subprocess.run(['wg', 'genkey'], check=True, capture_output=True)
    return process.stdout.decode().strip()


def _find_next_interface():
    """Find next unused wireguard interface name."""
    output = subprocess.check_output(['wg', 'show',
                                      'interfaces']).decode().strip()
    interfaces = output.split()
    interface_num = 1
    new_interface_name = 'wg1'
    while new_interface_name in interfaces:
        interface_num += 1
        new_interface_name = 'wg' + str(interface_num)

    return new_interface_name


def add_server(settings):
    """Add information for connecting to a server."""
    app = app_module.App.get('wireguard')
    interface_name = _find_next_interface()
    settings['common']['name'] = 'WireGuard-Client-' + interface_name
    settings['common']['interface'] = interface_name
    settings['common']['autoconnect'] = app.is_enabled()
    if not settings['wireguard']['private_key']:
        settings['wireguard']['private_key'] = _generate_private_key()

    network.add_connection(settings)


def edit_server(interface, settings):
    """Edit information for connecting to a server."""
    settings['common']['interface'] = interface
    settings['common']['name'] = 'WireGuard-Client-' + interface
    if not settings['wireguard']['private_key']:
        settings['wireguard']['private_key'] = _generate_private_key()

    connection = network.get_connection_by_interface_name(interface)
    network.edit_connection(connection, settings)
    network.reactivate_connection(connection.get_uuid())


def setup_server():
    """Setup a server connection that clients can connect to."""
    app = app_module.App.get('wireguard')
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    private_key = _generate_private_key()
    settings = {
        'common': {
            'name': 'WireGuard-Server-wg0',
            'type': setting_name,
            'zone': 'internal',
            'interface': 'wg0',
            'autoconnect': app.is_enabled(),
        },
        'ipv4': {
            'method': 'manual',
            'address': IP_TEMPLATE.format(1),
            'netmask': '255.255.255.0',
            'gateway': '',
            'dns': '',
            'second_dns': '',
        },
        'wireguard': {
            'private_key': private_key,
            'listen_port': 51820,
        }
    }
    network.add_connection(settings)
    logger.info('Created new WireGuard server connection')


def _get_next_available_ip_address(settings):
    """Get the next available IP address to allocate to a client."""
    allocated_ips = set()
    for peer_index in range(settings.get_peers_len()):
        peer = settings.get_peer(peer_index)
        for ip_index in range(peer.get_allowed_ips_len()):
            allowed_ip = peer.get_allowed_ip(ip_index)
            # We assume these are simple IP addresses but they can be subnets.
            allocated_ips.add(allowed_ip)

    for index in range(2, 254):
        ip_address = IP_TEMPLATE.format(index)
        if ip_address not in allocated_ips:
            return ip_address

    raise IndexError('Reached client limit')


def _server_connection():
    """Return a server connection. Create one if necessary."""
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    connection = network.get_connection_by_interface_name('wg0')
    if not connection:
        setup_server()

    for _ in range(10):
        # XXX: Improve this waiting by doing a synchronous D-Bus operation to
        # add network manager connection instead.
        time.sleep(1)
        connection = network.get_connection_by_interface_name('wg0')
        if connection:
            break

    if not connection:
        raise RuntimeError('Unable to create a server connection.')

    # Retrieve secrets so that when the connection is changed, secrets are
    # preserved properly.
    secrets = connection.get_secrets(setting_name)
    connection.update_secrets(setting_name, secrets)

    return connection


def add_client(public_key):
    """Add a permission for a client to connect our server."""
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    connection = _server_connection()
    settings = connection.get_setting_by_name(setting_name)
    peer, _ = settings.get_peer_by_public_key(public_key)
    if peer:
        raise ValueError('Peer with public key already exists')

    peer = nm.WireGuardPeer.new()
    peer.set_public_key(public_key, False)
    peer.set_persistent_keepalive(25)  # To keep NAT 'connections' alive
    peer.append_allowed_ip(_get_next_available_ip_address(settings), False)
    settings.append_peer(peer)
    connection.commit_changes(True)
    network.reactivate_connection(connection.get_uuid())


def remove_client(public_key):
    """Remove permission for a client to connect our server."""
    setting_name = nm.SETTING_WIREGUARD_SETTING_NAME
    connection = _server_connection()
    settings = connection.get_setting_by_name(setting_name)
    peer, peer_index = settings.get_peer_by_public_key(public_key)
    if not peer:
        raise KeyError('Client not found')

    settings.remove_peer(peer_index)
    connection.commit_changes(True)
    network.reactivate_connection(connection.get_uuid())