1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Test the backups action script.
"""
import os
import pathlib
import subprocess
import uuid
import pytest
from plinth.modules import backups
from plinth.modules.backups import privileged
from plinth.modules.backups.repository import BorgRepository, SshBorgRepository
from plinth.tests import config as test_config
pytestmark = pytest.mark.usefixtures('needs_root', 'needs_borg', 'load_cfg',
'mock_privileged')
privileged_modules_to_mock = ['plinth.modules.backups.privileged']
# try to access a non-existing url and a URL that exists but does not
# grant access
_dummy_credentials = {'ssh_password': 'invalid_password'}
_repokey_encryption_passphrase = '12345'
@pytest.fixture(name='needs_borg')
def fixture_needs_borg():
"""Return whether borg is installed on the system."""
try:
subprocess.run(['borg', '--version'], stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, check=True)
except FileNotFoundError:
pytest.skip('Needs borg installed')
@pytest.fixture(name='needs_ssh_config')
def fixture_needs_ssh_config():
"""Skip test if SSH details is not available in test configuration."""
if not test_config.backups_ssh_path:
pytest.skip('Needs SSH password provided')
@pytest.fixture(name='data_directory')
def fixture_data_directory():
"""Return directory where backup data is stored."""
return pathlib.Path(__file__).parent / 'backup_data'
@pytest.fixture(name='backup_directory')
def fixture_backup_directory(tmp_path):
"""Create and cleanup a backup directory."""
return tmp_path
def test_nonexisting_repository(backup_directory):
"""Test that non-existent directory as borg repository throws error."""
nonexisting_dir = backup_directory / 'does_not_exist'
repository = BorgRepository(str(nonexisting_dir))
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
def test_empty_dir(backup_directory):
"""Test that empty directory as borg repository throws error."""
empty_dir = backup_directory / 'empty_dir'
empty_dir.mkdir()
repository = BorgRepository(str(empty_dir))
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
def test_create_unencrypted_repository(backup_directory):
"""Test creating an unencrypted repository."""
path = backup_directory / 'borgbackup'
repository = BorgRepository(str(path))
repository.initialize()
info = repository.get_info()
assert 'encryption' in info
def test_create_export_delete_archive(data_directory, backup_directory):
"""
- Create a repo
- Create an archive
- Verify archive content
- Delete archive
"""
repo_name = 'test_create_and_delete'
archive_name = 'first_archive'
archive_comment = 'test_archive_comment'
path = backup_directory / repo_name
repository = BorgRepository(str(path))
repository.initialize()
archive_path = "::".join([str(path), archive_name])
privileged.create_archive(archive_path, [str(data_directory)],
archive_comment)
archive = repository.list_archives()[0]
assert archive['name'] == archive_name
assert archive['comment'] == archive_comment
repository.delete_archive(archive_name)
content = repository.list_archives()
assert not content
@pytest.mark.usefixtures('needs_ssh_config')
def test_remote_backup_actions():
"""
Test creating an encrypted remote repository using borg directly.
This relies on borgbackups being installed on the remote machine.
"""
credentials = _get_credentials(add_encryption_passphrase=True)
path = os.path.join(test_config.backups_ssh_path, str(uuid.uuid1()))
privileged.init(path, 'repokey', **_get_borg_arguments(credentials))
info = privileged.info(path, **_get_borg_arguments(credentials))
assert info['encryption']['mode'] == 'repokey'
def _get_borg_arguments(credentials):
"""Get credential arguments for running borg privileged actions."""
return {
'passphrase': credentials.get('encryption_passphrase', None),
'ssh_keyfile': credentials.get('ssh_keyfile')
}
@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_password():
"""Test (un)mounting if password for a remote location is given"""
credentials = _get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(ssh_path, credentials)
repository.mount()
assert repository.is_mounted
repository.umount()
assert not repository.is_mounted
@pytest.mark.usefixtures('needs_ssh_config')
def test_sshfs_mount_keyfile():
"""Test (un)mounting if keyfile for a remote location is given"""
credentials = _get_credentials()
ssh_path = test_config.backups_ssh_path
repository = SshBorgRepository(ssh_path, credentials)
repository.mount()
assert repository.is_mounted
repository.umount()
assert not repository.is_mounted
def test_access_nonexisting_url():
"""Test accessing a non-existent URL."""
repo_url = "user@%s.com.au:~/repo" % str(uuid.uuid1())
repository = SshBorgRepository(repo_url, _dummy_credentials)
with pytest.raises(backups.errors.BorgRepositoryDoesNotExistError):
repository.get_info()
def test_inaccessible_repo_url():
"""Test accessing an existing URL with wrong credentials."""
repo_url = 'user@heise.de:~/repo'
repository = SshBorgRepository(repo_url, _dummy_credentials)
with pytest.raises(backups.errors.BorgError):
repository.get_info()
def _get_credentials(add_encryption_passphrase=False):
"""
Get access params for a remote location.
Return an empty dict if no valid access params are found.
"""
credentials = {}
if test_config.backups_ssh_password:
credentials['ssh_password'] = test_config.backups_ssh_password
elif test_config.backups_ssh_keyfile:
credentials['ssh_keyfile'] = test_config.backups_ssh_keyfile
if add_encryption_passphrase:
credentials['encryption_passphrase'] = \
_repokey_encryption_passphrase
return credentials
|