File: forms.py

package info (click to toggle)
freedombox 26.2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 82,976 kB
  • sloc: python: 48,504; javascript: 1,736; xml: 481; makefile: 290; sh: 167; php: 32
file content (306 lines) | stat: -rw-r--r-- 12,215 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Forms for backups module.
"""

import logging
import os
import re
import subprocess

from django import forms
from django.core.exceptions import ValidationError
from django.core.validators import (FileExtensionValidator,
                                    validate_ipv46_address)
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _

from plinth import cfg
from plinth.modules.storage import get_mounts
from plinth.utils import format_lazy

from . import api, split_path
from .repository import get_repositories

logger = logging.getLogger(__name__)


def _get_app_choices(components):
    """Return a list of check box multiple choices from list of components."""
    choices = []
    for component in components:
        name = str(component.app.info.name)
        if not component.has_data:
            name = gettext('{app} (No data to backup)').format(
                app=component.app.info.name)

        choices.append((component.app_id, name))

    return sorted(choices, key=lambda choice: choice[1].lower())


def _get_repository_choices():
    """Return the list of available repositories."""
    choices = [(repository.uuid, repository.name)
               for repository in get_repositories() if repository.is_usable()]

    return choices


class ScheduleForm(forms.Form):
    """Form to edit backups schedule."""

    enabled = forms.BooleanField(
        label=_('Enable scheduled backups'), required=False,
        help_text=_('If enabled, a backup is taken every day, every week and '
                    'every month. Older backups are removed.'))

    daily_to_keep = forms.IntegerField(
        label=_('Number of daily backups to keep'), required=True, min_value=0,
        help_text=_('This many latest backups are kept and the rest are '
                    'removed. A value of "0" disables backups of this type. '
                    'Triggered at specified hour every day.'))

    weekly_to_keep = forms.IntegerField(
        label=_('Number of weekly backups to keep'), required=True,
        min_value=0,
        help_text=_('This many latest backups are kept and the rest are '
                    'removed. A value of "0" disables backups of this type. '
                    'Triggered at specified hour every Sunday.'))

    monthly_to_keep = forms.IntegerField(
        label=_('Number of monthly backups to keep'), required=True,
        min_value=0,
        help_text=_('This many latest backups are kept and the rest are '
                    'removed. A value of "0" disables backups of this type. '
                    'Triggered at specified hour first day of every month.'))

    run_at_hour = forms.IntegerField(
        label=_('Hour of the day to trigger backup operation'), required=True,
        min_value=0, max_value=23, help_text=_(
            'In 24 hour format. Services may become temporarily unavailable '
            'while running backup operation at this time of the day.'))

    selected_apps = forms.MultipleChoiceField(
        label=_('Included apps'), help_text=_('Apps to include in the backup'),
        widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))

    def __init__(self, *args, **kwargs):
        """Initialize the form with selectable apps."""
        super().__init__(*args, **kwargs)
        components = api.get_all_components_for_backup()
        choices = _get_app_choices(components)
        self.fields['selected_apps'].choices = choices
        self.fields['selected_apps'].initial = [
            choice[0] for choice in choices
            if choice[0] not in self.initial.get('unselected_apps', [])
        ]


class CreateArchiveForm(forms.Form):
    repository = forms.ChoiceField(label=_('Repository'))
    name = forms.RegexField(
        label=_('Name'),
        help_text=_('(Optional) Set a name for this backup archive'),
        regex=r'^[^{}/]*$', required=False, strip=True)
    selected_apps = forms.MultipleChoiceField(
        label=_('Included apps'), help_text=_('Apps to include in the backup'),
        widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))

    def __init__(self, *args, **kwargs):
        """Initialize the form with selectable apps."""
        super().__init__(*args, **kwargs)
        components = api.get_all_components_for_backup()
        choices = _get_app_choices(components)
        self.fields['selected_apps'].choices = choices
        if not self.initial or 'selected_apps' not in self.initial:
            self.fields['selected_apps'].initial = [
                choice[0] for choice in choices
            ]
        self.fields['repository'].choices = _get_repository_choices()


class RestoreForm(forms.Form):
    selected_apps = forms.MultipleChoiceField(
        label=_('Select the apps you want to restore'),
        widget=forms.CheckboxSelectMultiple(attrs={'class': 'has-select-all'}))

    def __init__(self, *args, **kwargs):
        """Initialize the form with selectable apps."""
        components = kwargs.pop('components')
        super().__init__(*args, **kwargs)
        choices = _get_app_choices(components)
        self.fields['selected_apps'].choices = choices
        self.fields['selected_apps'].initial = [
            choice[0] for choice in choices
        ]


class UploadForm(forms.Form):
    file = forms.FileField(
        label=_('Upload File'), required=True, validators=[
            FileExtensionValidator(
                ['gz'], _('Backup files have to be in .tar.gz format'))
        ], help_text=format_lazy(
            _('Select the backup file to upload from the local computer. This '
              'must be a file previously downloaded from the result of a '
              'successful backup on a {box_name}. It must have a .tar.gz '
              'extension.'), box_name=_(cfg.box_name)))


def repository_validator(path):
    """Validate an SSH repository path."""
    if not ('@' in path and ':' in path):
        raise ValidationError(_('Repository path format incorrect.'))

    username, hostname, dir_path = split_path(path)
    hostname = hostname.split('%')[0]

    # Validate username using Unix username regex
    if not re.match(r'[a-z0-9_][a-z0-9_-]*$', username):
        raise ValidationError(_(f'Invalid username: {username}'))

    # The hostname should either be a valid IP address or hostname
    # Follows RFC1123 (hostnames can start with digits) instead of RFC952
    hostname_re = (r'^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*'
                   r'([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$')
    try:
        validate_ipv46_address(hostname)
    except ValidationError:
        if not re.match(hostname_re, hostname):
            raise ValidationError(_(f'Invalid hostname: {hostname}'))

    # Validate directory path
    if not re.match(r'[^\0]*', dir_path):
        raise ValidationError(_(f'Invalid directory path: {dir_path}'))


class EncryptedBackupsMixin(forms.Form):
    """Form to add a new backup repository."""
    encryption = forms.ChoiceField(
        label=_('Encryption'), help_text=format_lazy(
            _('"Key in Repository" means that a '
              'password-protected key is stored with the backup.')),
        choices=[('repokey', _('Key in Repository')), ('none', _('None'))])
    encryption_passphrase = forms.CharField(
        label=_('Passphrase'),
        help_text=_('Passphrase; Only needed when using encryption.'),
        widget=forms.PasswordInput(), required=False)
    confirm_encryption_passphrase = forms.CharField(
        label=_('Confirm Passphrase'), help_text=_('Repeat the passphrase.'),
        widget=forms.PasswordInput(), required=False)

    def clean(self):
        super().clean()
        passphrase = self.cleaned_data.get('encryption_passphrase')
        confirm_passphrase = self.cleaned_data.get(
            'confirm_encryption_passphrase')

        if passphrase != confirm_passphrase:
            raise forms.ValidationError(
                _('The entered encryption passphrases do not match'))

        if self.cleaned_data.get('encryption') != 'none' and not passphrase:
            raise forms.ValidationError(
                _('Passphrase is needed for encryption.'))

        return self.cleaned_data


encryption_fields = [
    'encryption', 'encryption_passphrase', 'confirm_encryption_passphrase'
]


def get_disk_choices():
    """Returns a list of all available partitions except the root partition."""
    repositories = get_repositories()
    existing_paths = [
        repository.path for repository in repositories
        if repository.storage_type == 'disk'
    ]
    choices = []
    for device in get_mounts():
        if device['mount_point'] == '/':
            continue

        path = os.path.join(device['mount_point'], 'FreedomBoxBackups')
        if path in existing_paths:
            continue

        name = device['label'] if device['label'] else device['mount_point']
        choices.append((device['mount_point'], name))

    return choices


class AddRepositoryForm(EncryptedBackupsMixin, forms.Form):
    """Form to create a new backups repository on a disk."""
    disk = forms.ChoiceField(
        label=_('Select Disk or Partition'), help_text=format_lazy(
            _('Backups will be stored in the directory FreedomBoxBackups')),
        choices=get_disk_choices)

    field_order = ['disk'] + encryption_fields


class AddRemoteRepositoryForm(EncryptedBackupsMixin, forms.Form):
    """Form to add new SSH remote repository."""
    repository = forms.CharField(
        label=_('SSH Repository Path'), strip=True,
        help_text=_('Path of a new or existing repository. Example: '
                    '<i>user@host:~/path/to/repo/</i>'),
        validators=[repository_validator])
    ssh_password = forms.CharField(
        label=_('SSH server password'), strip=True,
        help_text=_('Password of the SSH Server.<br />'
                    'SSH key-based authentication is not yet possible.'),
        widget=forms.PasswordInput(), required=False)

    field_order = ['repository', 'ssh_password'] + encryption_fields

    def clean_repository(self):
        """Validate repository form field."""
        path = self.cleaned_data.get('repository')
        # Avoid creation of duplicate ssh remotes
        self._check_if_duplicate_remote(path)
        return path

    @staticmethod
    def _check_if_duplicate_remote(path):
        """Raise validation error if given path is a stored remote."""
        for repository in get_repositories():
            if repository.path == path:
                raise forms.ValidationError(
                    _('Remote backup repository already exists.'))


class VerifySshHostkeyForm(forms.Form):
    """Form to verify the SSH public key for a host."""
    ssh_public_key = forms.ChoiceField(
        label=_('Select verified SSH public key'), widget=forms.RadioSelect)

    def __init__(self, *args, **kwargs):
        """Initialize the form with selectable apps."""
        hostname = kwargs.pop('hostname')
        super().__init__(*args, **kwargs)
        (self.fields['ssh_public_key'].choices,
         self.keyscan_error) = self._get_all_public_keys(hostname)

    @staticmethod
    def _get_all_public_keys(hostname):
        """Use ssh-keyscan to get all the SSH public keys of a host."""
        # Fetch public keys of ssh remote
        keyscan = subprocess.run(['ssh-keyscan', hostname],
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE, check=False)
        key_lines = keyscan.stdout.decode().splitlines()
        keys = [line for line in key_lines if not line.startswith('#')]
        error_message = keyscan.stderr.decode() if keyscan.returncode else None
        # Generate user-friendly fingerprints of public keys
        keygen = subprocess.run(['ssh-keygen', '-l', '-f', '-'],
                                input=keyscan.stdout, stdout=subprocess.PIPE,
                                check=False)
        fingerprints = keygen.stdout.decode().splitlines()

        return zip(keys, fingerprints), error_message