File: utils.py

package info (click to toggle)
freedombox 26.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 82,976 kB
  • sloc: python: 48,504; javascript: 1,736; xml: 481; makefile: 290; sh: 167; php: 32
file content (199 lines) | stat: -rw-r--r-- 5,543 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Miscellaneous utility methods.
"""

import gzip
import importlib
import os
import random
import re
import string

import markupsafe
import ruamel.yaml
from django.utils.functional import lazy

from plinth.version import Version  # noqa


def import_from_gi(library, version):
    """Import and return a GObject introspection library."""
    try:
        import gi as package
        package_name = 'gi'
    except ImportError:
        import pgi as package
        package_name = 'pgi'

    package.require_version(library, version)

    return importlib.import_module(package_name + '.repository.' + library)


def _format_lazy(string, *args, **kwargs):
    """Lazily format a lazy string."""
    allow_markup = kwargs.pop('allow_markup', False)
    string = str(string)
    string = string.format(*args, **kwargs)
    if allow_markup:
        string = markupsafe.Markup(string)

    return string


format_lazy = lazy(_format_lazy, str)


def non_admin_view(func):
    """Decorator to mark a view as accessible by non-admin users."""
    setattr(func, 'IS_NON_ADMIN', True)
    return func


def user_group_view(func, group_name):
    """Decorator to mark a view as accessible by admin or group users."""
    setattr(func, 'GROUP_NAME', group_name)
    return func


def is_valid_user_name(user_name):
    """Check if the given username is valid.

    Note: Debian is VERY flexible with user names.
    """
    if 32 < len(user_name):
        return False

    if user_name.startswith('-'):
        return False

    for forbidden in (' \n\t/\\:'):
        if forbidden in user_name:
            return False

    return True


def is_user_admin(request, cached=False):
    """Return whether user is an administrator."""
    if not request.user.is_authenticated:
        return False

    if 'cache_user_is_admin' in request.session and cached:
        return request.session['cache_user_is_admin']

    user_is_admin = request.user.groups.filter(name='admin').exists()
    request.session['cache_user_is_admin'] = user_is_admin
    return user_is_admin


class YAMLFile:
    """A context management class for updating YAML files"""

    def __init__(self, yaml_file):
        """Return a context object for the YAML file.

        Parameters:
        yaml_file - the YAML file to update.
        updating the YAML file.
        """
        self.yaml_file = yaml_file
        self.conf = None

        self.yaml = ruamel.yaml.YAML()
        self.yaml.preserve_quotes = True

    def __enter__(self):
        with open(self.yaml_file, 'r', encoding='utf-8') as intro_conf:
            if not self.is_file_empty():
                self.conf = self.yaml.load(intro_conf)
            else:
                self.conf = {}

            return self.conf

    def __exit__(self, type_, value, traceback):
        if not traceback:
            with open(self.yaml_file, 'w', encoding='utf-8') as intro_conf:
                self.yaml.dump(self.conf, intro_conf)

    def is_file_empty(self):
        return os.stat(self.yaml_file).st_size == 0


def random_string(size=8):
    """Generate a random alphanumeric string."""
    chars = (random.SystemRandom().choice(string.ascii_letters)
             for _ in range(size))
    return ''.join(chars)


def generate_password(size=32):
    """Generate a random password using ascii alphabet and digits."""
    chars = (random.SystemRandom().choice(string.ascii_letters + string.digits)
             for _ in range(size))
    return ''.join(chars)


def grep(pattern, file_name):
    """Return lines of a file matching a pattern."""
    return [
        line.rstrip() for line in open(file_name, encoding='utf-8')
        if re.search(pattern, line)
    ]


def gunzip(gzip_file, output_file):
    """Utility to unzip a given gzip file and write it to an output file

    gzip_file: string path to a gzip file
    output_file: string path to the output file
    mode: an octal number to specify file permissions
    """
    output_dir = os.path.dirname(output_file)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir, mode=0o755)

    with gzip.open(gzip_file, 'rb') as file_handle:
        contents = file_handle.read()

    def opener(path, flags):
        return os.open(path, flags, mode=0o644)

    with open(output_file, 'wb', opener=opener) as file_handle:
        file_handle.write(contents)


def is_non_empty_file(file_path):
    return os.path.isfile(file_path) and os.path.getsize(file_path) > 0


def is_authenticated_user(username, password):
    """Return true if the user authentication succeeds."""
    import pam  # Minimize dependencies for running tests
    pam_authenticator = pam.pam()
    return bool(pam_authenticator.authenticate(username, password))


class SafeFormatter(string.Formatter):
    """A string.format() handler to deal with missing arguments."""

    def get_value(self, key, args, kwargs):
        """Retrieve a given field's value: 0 or foo."""
        try:
            return super().get_value(key, args, kwargs)
        except (IndexError, KeyError):
            return f'?{key}?'

    def get_field(self, field_name, args, kwargs):
        """Retrieve a given field's value: 0[foo] or foo.bar."""
        try:
            return super().get_field(field_name, args, kwargs)
        except (AttributeError, TypeError):
            return (f'?{field_name}?', '')


def gettext_noop(message: str) -> str:
    """Do nothing. Using this method marks the string for translation."""
    return message