1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
import os
import pytest
try:
import attic.repository
import attic.key
import attic.helpers
except ImportError:
attic = None
from ..upgrader import AtticRepositoryUpgrader, AtticKeyfileKey
from ..helpers import get_keys_dir
from ..key import KeyfileKey
from ..archiver import UMASK_DEFAULT
from ..repository import Repository
def repo_valid(path):
"""
utility function to check if borg can open a repository
:param path: the path to the repository
:returns: if borg can check the repository
"""
with Repository(str(path), exclusive=True, create=False) as repository:
# can't check raises() because check() handles the error
return repository.check()
def key_valid(path):
"""
check that the new keyfile is alright
:param path: the path to the key file
:returns: if the file starts with the borg magic string
"""
keyfile = os.path.join(get_keys_dir(),
os.path.basename(path))
with open(keyfile, 'r') as f:
return f.read().startswith(KeyfileKey.FILE_ID)
@pytest.fixture()
def attic_repo(tmpdir):
"""
create an attic repo with some stuff in it
:param tmpdir: path to the repository to be created
:returns: a attic.repository.Repository object
"""
attic_repo = attic.repository.Repository(str(tmpdir), create=True)
# throw some stuff in that repo, copied from `RepositoryTestCase.test1`
for x in range(100):
attic_repo.put(('%-32d' % x).encode('ascii'), b'SOMEDATA')
attic_repo.commit()
attic_repo.close()
return attic_repo
@pytest.fixture(params=[True, False])
def inplace(request):
return request.param
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_segments(tmpdir, attic_repo, inplace):
"""test segment conversion
this will load the given attic repository, list all the segments
then convert them one at a time. we need to close the repo before
conversion otherwise we have errors from borg
:param tmpdir: a temporary directory to run the test in (builtin
fixture)
:param attic_repo: a populated attic repository (fixture)
"""
# check should fail because of magic number
assert not repo_valid(tmpdir)
repository = AtticRepositoryUpgrader(str(tmpdir), create=False)
with repository:
segments = [filename for i, filename in repository.io.segment_iterator()]
repository.convert_segments(segments, dryrun=False, inplace=inplace)
repository.convert_cache(dryrun=False)
assert repo_valid(tmpdir)
class MockArgs:
"""
mock attic location
this is used to simulate a key location with a properly loaded
repository object to create a key file
"""
def __init__(self, path):
self.repository = attic.helpers.Location(path)
@pytest.fixture()
def attic_key_file(attic_repo, tmpdir, monkeypatch):
"""
create an attic key file from the given repo, in the keys
subdirectory of the given tmpdir
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param tmpdir: a temporary directory (a builtin fixture)
:returns: the KeyfileKey object as returned by
attic.key.KeyfileKey.create()
"""
keys_dir = str(tmpdir.mkdir('keys'))
# we use the repo dir for the created keyfile, because we do
# not want to clutter existing keyfiles
monkeypatch.setenv('ATTIC_KEYS_DIR', keys_dir)
# we use the same directory for the converted files, which
# will clutter the previously created one, which we don't care
# about anyways. in real runs, the original key will be retained.
monkeypatch.setenv('BORG_KEYS_DIR', keys_dir)
monkeypatch.setenv('ATTIC_PASSPHRASE', 'test')
return attic.key.KeyfileKey.create(attic_repo,
MockArgs(keys_dir))
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_keys(tmpdir, attic_repo, attic_key_file):
"""test key conversion
test that we can convert the given key to a properly formatted
borg key. assumes that the ATTIC_KEYS_DIR and BORG_KEYS_DIR have
been properly populated by the attic_key_file fixture.
:param tmpdir: a temporary directory (a builtin fixture)
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param attic_key_file: an attic.key.KeyfileKey (fixture created above)
"""
with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
keyfile = AtticKeyfileKey.find_key_file(repository)
AtticRepositoryUpgrader.convert_keyfiles(keyfile, dryrun=False)
assert key_valid(attic_key_file.path)
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
"""test all conversion steps
this runs everything. mostly redundant test, since everything is
done above. yet we expect a NotImplementedError because we do not
convert caches yet.
:param tmpdir: a temporary directory (a builtin fixture)
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param attic_key_file: an attic.key.KeyfileKey (fixture created above)
"""
# check should fail because of magic number
assert not repo_valid(tmpdir)
def stat_segment(path):
return os.stat(os.path.join(path, 'data', '0', '0'))
def first_inode(path):
return stat_segment(path).st_ino
orig_inode = first_inode(attic_repo.path)
with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
# replicate command dispatch, partly
os.umask(UMASK_DEFAULT)
backup = repository.upgrade(dryrun=False, inplace=inplace)
if inplace:
assert backup is None
assert first_inode(repository.path) == orig_inode
else:
assert backup
assert first_inode(repository.path) != first_inode(backup)
# i have seen cases where the copied tree has world-readable
# permissions, which is wrong
assert stat_segment(backup).st_mode & UMASK_DEFAULT == 0
assert key_valid(attic_key_file.path)
assert repo_valid(tmpdir)
def test_hardlink(tmpdir, inplace):
"""test that we handle hard links properly
that is, if we are in "inplace" mode, hardlinks should *not*
change (ie. we write to the file directly, so we do not rewrite the
whole file, and we do not re-create the file).
if we are *not* in inplace mode, then the inode should change, as
we are supposed to leave the original inode alone."""
a = str(tmpdir.join('a'))
with open(a, 'wb') as tmp:
tmp.write(b'aXXX')
b = str(tmpdir.join('b'))
os.link(a, b)
AtticRepositoryUpgrader.header_replace(b, b'a', b'b', inplace=inplace)
if not inplace:
assert os.stat(a).st_ino != os.stat(b).st_ino
else:
assert os.stat(a).st_ino == os.stat(b).st_ino
with open(b, 'rb') as tmp:
assert tmp.read() == b'bXXX'
|