File: postfix.py

package info (click to toggle)
freedombox 26.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 83,092 kB
  • sloc: python: 48,542; javascript: 1,730; xml: 481; makefile: 290; sh: 137; php: 32
file content (132 lines) | stat: -rw-r--r-- 3,637 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Set and get postfix configuration using postconf.

See: http://www.postfix.org/postconf.1.html
See: http://www.postfix.org/master.5.html
See: http://www.postfix.org/postconf.5.html
"""

import re
import subprocess
from dataclasses import dataclass

from plinth import action_utils


@dataclass
class Service:  # NOQA, pylint: disable=too-many-instance-attributes
    """Representation of a postfix daemon and its options."""
    service: str
    type_: str
    private: str
    unpriv: str
    chroot: str
    wakeup: str
    maxproc: str
    command: str
    options: dict[str, str]

    def __str__(self) -> str:
        parts = [
            self.service, self.type_, self.private, self.unpriv, self.chroot,
            self.wakeup, self.maxproc, self.command
        ]
        for key, value in self.options.items():
            _validate_key(key)
            _validate_value(value)
            parts.extend(['-o', f'{key}={value}'])

        return ' '.join(parts)


def get_config(keys: list) -> dict:
    """Get postfix configuration using the postconf command."""
    for key in keys:
        _validate_key(key)

    args = ['/sbin/postconf']
    for key in keys:
        args.append(key)

    output = _run(args)
    result = {}
    lines: list[str] = list(filter(None, output.split('\n')))
    for line in lines:
        key, sep, value = line.partition('=')
        if not sep:
            raise ValueError('Invalid output detected')

        result[key.strip()] = value.strip()

    if set(keys) != set(result.keys()):
        raise ValueError('Some keys were missing from the output')

    return result


def set_config(config: dict, flag=None):
    """Set postfix configuration using the postconf command."""
    if not config:
        return

    for key, value in config.items():
        _validate_key(key)
        _validate_value(value)

    args = ['/sbin/postconf']
    if flag:
        args.append(flag)

    for key, value in config.items():
        args.append('{}={}'.format(key, value))

    _run(args)


def set_master_config(service: Service):
    """Set daemons and their options in postfix master.cf."""
    service_key = service.service + '/' + service.type_
    set_config({service_key: str(service)}, '-M')


def parse_maps(raw_value):
    """Parse postfix configuration values that are maps."""
    if '{' in raw_value or '}' in raw_value:
        raise ValueError('Unsupported map list format')

    value_list = []
    for segment in raw_value.split(','):
        for sub_segment in segment.strip().split(' '):
            sub_segment = sub_segment.strip()
            if sub_segment:
                value_list.append(sub_segment)

    return value_list


def _run(args):
    """Run process. Capture and return standard output as a string.

    Raise a RuntimeError on non-zero exit codes.
    """
    try:
        result = action_utils.run(args, check=True)
        return result.stdout.decode()
    except subprocess.SubprocessError as subprocess_error:
        raise RuntimeError('Subprocess failed') from subprocess_error
    except UnicodeDecodeError as unicode_error:
        raise RuntimeError('Unicode decoding failed') from unicode_error


def _validate_key(key):
    """Validate postconf key format or raise ValueError."""
    if not re.match(r'^[a-zA-Z][a-zA-Z0-9_/]*$', key):
        raise ValueError('Invalid postconf key format')


def _validate_value(value):
    """Validate postconf value format or raise ValueError."""
    for char in value:
        if ord(char) < 32:
            raise ValueError('Value contains control characters')