File: test_backups.py

package info (click to toggle)
freedombox 26.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 83,092 kB
  • sloc: python: 48,542; javascript: 1,730; xml: 481; makefile: 290; sh: 137; php: 32
file content (191 lines) | stat: -rw-r--r-- 6,272 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test the backups action script.
"""

import os
import pathlib
import subprocess
import uuid

import pytest

from plinth.modules import backups
from plinth.modules.backups import privileged
from plinth.modules.backups.repository import BorgRepository, SshBorgRepository
from plinth.tests import config as test_config

pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg',
                                     'mock_privileged')

privileged_modules_to_mock = ['plinth.modules.backups.privileged']

# try to access a non-existing url and a URL that exists but does not
# grant access
_dummy_credentials = {'ssh_password': 'invalid_password'}
_repokey_encryption_passphrase = '12345'


@pytest.fixture(name='needs_borg')
def fixture_needs_borg():
    """Return whether borg is installed on the system."""
    try:
        subprocess.run(['borg', '--version'], stdout=subprocess.DEVNULL,
                       stderr=subprocess.DEVNULL, check=True)
    except FileNotFoundError:
        pytest.skip('Needs borg installed')


@pytest.fixture(name='needs_ssh_config')
def fixture_needs_ssh_config():
    """Skip test if SSH details is not available in test configuration."""
    if not test_config.backups_ssh_path:
        pytest.skip('Needs SSH password provided')


@pytest.fixture(name='data_directory')
def fixture_data_directory():
    """Return directory where backup data is stored."""
    return pathlib.Path(__file__).parent / 'backup_data'


@pytest.fixture(name='backup_directory')
def fixture_backup_directory(tmp_path):
    """Create and cleanup a backup directory."""
    return tmp_path


def test_nonexisting_repository(backup_directory):
    """Test that non-existent directory as borg repository throws error."""
    nonexisting_dir = backup_directory / 'does_not_exist'
    repository = BorgRepository(str(nonexisting_dir))
    with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
        repository.get_info()


def test_empty_dir(backup_directory):
    """Test that empty directory as borg repository throws error."""
    empty_dir = backup_directory / 'empty_dir'
    empty_dir.mkdir()
    repository = BorgRepository(str(empty_dir))
    with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
        repository.get_info()


def test_create_unencrypted_repository(backup_directory):
    """Test creating an unencrypted repository."""
    path = backup_directory / 'borgbackup'
    repository = BorgRepository(str(path))
    repository.initialize()
    info = repository.get_info()
    assert 'encryption' in info


def test_create_export_delete_archive(data_directory, backup_directory):
    """
    - Create a repo
    - Create an archive
    - Verify archive content
    - Delete archive
    """
    repo_name = 'test_create_and_delete'
    archive_name = 'first_archive'
    archive_comment = 'test_archive_comment'
    path = backup_directory / repo_name

    repository = BorgRepository(str(path))
    repository.initialize()
    archive_path = "::".join([str(path), archive_name])
    privileged.create_archive(archive_path, [str(data_directory)],
                              archive_comment)

    archive = repository.list_archives()[0]
    assert archive['name'] == archive_name
    assert archive['comment'] == archive_comment

    repository.delete_archive(archive_name)
    content = repository.list_archives()
    assert not content


@pytest.mark.usefixtures('needs_ssh_config')
def test_remote_backup_actions():
    """
    Test creating an encrypted remote repository using borg directly.

    This relies on borgbackups being installed on the remote machine.
    """
    credentials = _get_credentials(add_encryption_passphrase=True)
    path = os.path.join(test_config.backups_ssh_path, str(uuid.uuid1()))
    privileged.init(path, 'repokey', **_get_borg_arguments(credentials))

    info = privileged.info(path, **_get_borg_arguments(credentials))
    assert info['encryption']['mode'] == 'repokey'


def _get_borg_arguments(credentials):
    """Get credential arguments for running borg privileged actions."""
    return {
        'passphrase': credentials.get('encryption_passphrase', None),
        'ssh_keyfile': credentials.get('ssh_keyfile')
    }


@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_password():
    """Test (un)mounting if password for a remote location is given"""
    credentials = _get_credentials()
    ssh_path = test_config.backups_ssh_path

    repository = SshBorgRepository(ssh_path, credentials)
    repository.mount()
    assert repository.is_mounted
    repository.umount()
    assert not repository.is_mounted


@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_keyfile():
    """Test (un)mounting if keyfile for a remote location is given"""
    credentials = _get_credentials()
    ssh_path = test_config.backups_ssh_path

    repository = SshBorgRepository(ssh_path, credentials)
    repository.mount()
    assert repository.is_mounted
    repository.umount()
    assert not repository.is_mounted


def test_access_nonexisting_url():
    """Test accessing a non-existent URL."""
    repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
    repository = SshBorgRepository(repo_url, _dummy_credentials)
    with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
        repository.get_info()


def test_inaccessible_repo_url():
    """Test accessing an existing URL with wrong credentials."""
    repo_url = 'user@heise.de:~/repo'
    repository = SshBorgRepository(repo_url, _dummy_credentials)
    with pytest.raises(backups.errors.BorgError):
        repository.get_info()


def _get_credentials(add_encryption_passphrase=False):
    """
    Get access params for a remote location.
    Return an empty dict if no valid access params are found.
    """
    credentials = {}
    if test_config.backups_ssh_password:
        credentials['ssh_password'] = test_config.backups_ssh_password
    elif test_config.backups_ssh_keyfile:
        credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile

    if add_encryption_passphrase:
        credentials['encryption_passphrase'] = \
            _repokey_encryption_passphrase

    return credentials