1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Set and get postfix configuration using postconf.
See: http://www.postfix.org/postconf.1.html
See: http://www.postfix.org/master.5.html
See: http://www.postfix.org/postconf.5.html
"""
import re
import subprocess
from dataclasses import dataclass
from plinth import action_utils
@dataclass
class Service: # NOQA, pylint: disable=too-many-instance-attributes
"""Representation of a postfix daemon and its options."""
service: str
type_: str
private: str
unpriv: str
chroot: str
wakeup: str
maxproc: str
command: str
options: dict[str, str]
def __str__(self) -> str:
parts = [
self.service, self.type_, self.private, self.unpriv, self.chroot,
self.wakeup, self.maxproc, self.command
]
for key, value in self.options.items():
_validate_key(key)
_validate_value(value)
parts.extend(['-o', f'{key}={value}'])
return ' '.join(parts)
def get_config(keys: list) -> dict:
"""Get postfix configuration using the postconf command."""
for key in keys:
_validate_key(key)
args = ['/sbin/postconf']
for key in keys:
args.append(key)
output = _run(args)
result = {}
lines: list[str] = list(filter(None, output.split('\n')))
for line in lines:
key, sep, value = line.partition('=')
if not sep:
raise ValueError('Invalid output detected')
result[key.strip()] = value.strip()
if set(keys) != set(result.keys()):
raise ValueError('Some keys were missing from the output')
return result
def set_config(config: dict, flag=None):
"""Set postfix configuration using the postconf command."""
if not config:
return
for key, value in config.items():
_validate_key(key)
_validate_value(value)
args = ['/sbin/postconf']
if flag:
args.append(flag)
for key, value in config.items():
args.append('{}={}'.format(key, value))
_run(args)
def set_master_config(service: Service):
"""Set daemons and their options in postfix master.cf."""
service_key = service.service + '/' + service.type_
set_config({service_key: str(service)}, '-M')
def parse_maps(raw_value):
"""Parse postfix configuration values that are maps."""
if '{' in raw_value or '}' in raw_value:
raise ValueError('Unsupported map list format')
value_list = []
for segment in raw_value.split(','):
for sub_segment in segment.strip().split(' '):
sub_segment = sub_segment.strip()
if sub_segment:
value_list.append(sub_segment)
return value_list
def _run(args):
"""Run process. Capture and return standard output as a string.
Raise a RuntimeError on non-zero exit codes.
"""
try:
result = action_utils.run(args, check=True)
return result.stdout.decode()
except subprocess.SubprocessError as subprocess_error:
raise RuntimeError('Subprocess failed') from subprocess_error
except UnicodeDecodeError as unicode_error:
raise RuntimeError('Unicode decoding failed') from unicode_error
def _validate_key(key):
"""Validate postconf key format or raise ValueError."""
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_/]*$', key):
raise ValueError('Invalid postconf key format')
def _validate_value(value):
"""Validate postconf value format or raise ValueError."""
for char in value:
if ord(char) < 32:
raise ValueError('Value contains control characters')
|