1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Configure Tor Proxy service."""
import logging
import os
import shutil
from typing import Any
import augeas
from plinth import action_utils
from plinth.actions import privileged
from plinth.modules.torproxy.utils import (APT_TOR_PREFIX, get_augeas,
iter_apt_uris)
logger = logging.getLogger(__name__)
INSTANCE_NAME = 'fbxproxy'
SERVICE_FILE = '/etc/firewalld/services/tor-{0}.xml'
SERVICE_NAME = f'tor@{INSTANCE_NAME}'
TORPROXY_CONFIG = f'/etc/tor/instances/{INSTANCE_NAME}/torrc'
TORPROXY_CONFIG_AUG = f'/files/{TORPROXY_CONFIG}'
@privileged
def setup():
"""Setup Tor configuration."""
# Disable default Tor service.
action_utils.service_disable('tor@default')
# Mask the service to prevent re-enabling it by the Tor master service.
action_utils.service_mask('tor@default')
action_utils.run(['tor-instance-create', INSTANCE_NAME], check=True)
# Remove line starting with +SocksPort, since our augeas lens
# doesn't handle it correctly.
with open(TORPROXY_CONFIG, 'r', encoding='utf-8') as torrc:
torrc_lines = torrc.readlines()
with open(TORPROXY_CONFIG, 'w', encoding='utf-8') as torrc:
for line in torrc_lines:
if not line.startswith('+'):
torrc.write(line)
aug = augeas_load()
aug.set(TORPROXY_CONFIG_AUG + '/SocksPort[1]', '[::]:9050')
aug.set(TORPROXY_CONFIG_AUG + '/SocksPort[2]', '0.0.0.0:9050')
aug.set(TORPROXY_CONFIG_AUG + '/VirtualAddrNetworkIPv4', '10.192.0.0/10')
aug.set(TORPROXY_CONFIG_AUG + '/AutomapHostsOnResolve', '1')
aug.set(TORPROXY_CONFIG_AUG + '/TransPort[1]', '127.0.0.1:9040')
aug.set(TORPROXY_CONFIG_AUG + '/TransPort[2]', '[::1]:9040')
aug.set(TORPROXY_CONFIG_AUG + '/DNSPort[1]', '127.0.0.1:9053')
aug.set(TORPROXY_CONFIG_AUG + '/DNSPort[2]', '[::1]:9053')
aug.save()
if action_utils.service_is_running(SERVICE_NAME):
action_utils.service_restart(SERVICE_NAME)
@privileged
def configure(use_upstream_bridges: bool | None = None,
upstream_bridges: str | None = None,
apt_transport_tor: bool | None = None):
"""Configure Tor."""
aug = augeas_load()
_use_upstream_bridges(use_upstream_bridges, aug=aug)
if upstream_bridges:
_set_upstream_bridges(upstream_bridges, aug=aug)
if apt_transport_tor:
_enable_apt_transport_tor()
elif apt_transport_tor is not None:
_disable_apt_transport_tor()
@privileged
def restart():
"""Restart Tor."""
if (action_utils.service_is_enabled(SERVICE_NAME, strict_check=True)
and action_utils.service_is_running(SERVICE_NAME)):
action_utils.service_restart(SERVICE_NAME)
@privileged
def get_status() -> dict[str, bool | str | dict[str, Any]]:
"""Return dict with Tor Proxy status."""
aug = augeas_load()
return {
'use_upstream_bridges': _are_upstream_bridges_enabled(aug),
'upstream_bridges': _get_upstream_bridges(aug)
}
def _are_upstream_bridges_enabled(aug) -> bool:
"""Return whether upstream bridges are being used."""
use_bridges = aug.get(TORPROXY_CONFIG_AUG + '/UseBridges')
return use_bridges == '1'
def _get_upstream_bridges(aug) -> str:
"""Return upstream bridges separated by newlines."""
matches = aug.match(TORPROXY_CONFIG_AUG + '/Bridge')
bridges = [aug.get(match) for match in matches]
return '\n'.join(bridges)
def _use_upstream_bridges(use_upstream_bridges: bool | None = None, aug=None):
"""Enable use of upstream bridges."""
if use_upstream_bridges is None:
return
if not aug:
aug = augeas_load()
if use_upstream_bridges:
aug.set(TORPROXY_CONFIG_AUG + '/UseBridges', '1')
else:
aug.set(TORPROXY_CONFIG_AUG + '/UseBridges', '0')
aug.save()
def _set_upstream_bridges(upstream_bridges=None, aug=None):
"""Set list of upstream bridges."""
if upstream_bridges is None:
return
if not aug:
aug = augeas_load()
aug.remove(TORPROXY_CONFIG_AUG + '/Bridge')
if upstream_bridges:
bridges = [bridge.strip() for bridge in upstream_bridges.split('\n')]
bridges = [bridge for bridge in bridges if bridge]
for bridge in bridges:
parts = [part for part in bridge.split() if part]
bridge = ' '.join(parts)
aug.set(TORPROXY_CONFIG_AUG + '/Bridge[last() + 1]',
bridge.strip())
aug.set(TORPROXY_CONFIG_AUG + '/ClientTransportPlugin',
'obfs3,scramblesuit,obfs4 exec /usr/bin/obfs4proxy')
aug.save()
def _enable_apt_transport_tor():
"""Enable package download over Tor."""
aug = get_augeas()
for uri_path in iter_apt_uris(aug):
uri = aug.get(uri_path)
if uri.startswith('http://') or uri.startswith('https://'):
aug.set(uri_path, APT_TOR_PREFIX + uri)
aug.save()
def _disable_apt_transport_tor():
"""Disable package download over Tor."""
aug = get_augeas(raise_exception=False)
for uri_path in iter_apt_uris(aug):
uri = aug.get(uri_path)
if uri.startswith(APT_TOR_PREFIX):
aug.set(uri_path, uri[len(APT_TOR_PREFIX):])
aug.save()
def augeas_load():
"""Initialize Augeas."""
aug = augeas.Augeas(flags=augeas.Augeas.NO_LOAD +
augeas.Augeas.NO_MODL_AUTOLOAD)
aug.set('/augeas/load/Tor/lens', 'Tor.lns')
aug.set('/augeas/load/Tor/incl[last() + 1]', TORPROXY_CONFIG)
aug.load()
return aug
@privileged
def uninstall():
"""Remove fbxproxy instance."""
directories = [
f'/etc/tor/instances/{INSTANCE_NAME}/',
f'/var/lib/tor-instances/{INSTANCE_NAME}/',
f'/var/run/tor-instances/{INSTANCE_NAME}/'
]
for directory in directories:
shutil.rmtree(directory, ignore_errors=True)
os.unlink(f'/var/run/tor-instances/{INSTANCE_NAME}.defaults')
action_utils.service_unmask(SERVICE_NAME)
|