File: upgrader.py

package info (click to toggle)
borgbackup 1.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 3,572 kB
  • ctags: 5,885
  • sloc: python: 11,127; ansic: 628; makefile: 129; sh: 70
file content (206 lines) | stat: -rw-r--r-- 7,100 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import os

import pytest

try:
    import attic.repository
    import attic.key
    import attic.helpers
except ImportError:
    attic = None

from ..upgrader import AtticRepositoryUpgrader, AtticKeyfileKey
from ..helpers import get_keys_dir
from ..key import KeyfileKey
from ..archiver import UMASK_DEFAULT
from ..repository import Repository


def repo_valid(path):
    """
    utility function to check if borg can open a repository

    :param path: the path to the repository
    :returns: if borg can check the repository
    """
    with Repository(str(path), exclusive=True, create=False) as repository:
        # can't check raises() because check() handles the error
        return repository.check()


def key_valid(path):
    """
    check that the new keyfile is alright

    :param path: the path to the key file
    :returns: if the file starts with the borg magic string
    """
    keyfile = os.path.join(get_keys_dir(),
                           os.path.basename(path))
    with open(keyfile, 'r') as f:
        return f.read().startswith(KeyfileKey.FILE_ID)


@pytest.fixture()
def attic_repo(tmpdir):
    """
    create an attic repo with some stuff in it

    :param tmpdir: path to the repository to be created
    :returns: a attic.repository.Repository object
    """
    attic_repo = attic.repository.Repository(str(tmpdir), create=True)
    # throw some stuff in that repo, copied from `RepositoryTestCase.test1`
    for x in range(100):
        attic_repo.put(('%-32d' % x).encode('ascii'), b'SOMEDATA')
    attic_repo.commit()
    attic_repo.close()
    return attic_repo


@pytest.fixture(params=[True, False])
def inplace(request):
    return request.param


@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_segments(tmpdir, attic_repo, inplace):
    """test segment conversion

    this will load the given attic repository, list all the segments
    then convert them one at a time. we need to close the repo before
    conversion otherwise we have errors from borg

    :param tmpdir: a temporary directory to run the test in (builtin
    fixture)
    :param attic_repo: a populated attic repository (fixture)
    """
    # check should fail because of magic number
    assert not repo_valid(tmpdir)
    repository = AtticRepositoryUpgrader(str(tmpdir), create=False)
    with repository:
        segments = [filename for i, filename in repository.io.segment_iterator()]
    repository.convert_segments(segments, dryrun=False, inplace=inplace)
    repository.convert_cache(dryrun=False)
    assert repo_valid(tmpdir)


class MockArgs:
    """
    mock attic location

    this is used to simulate a key location with a properly loaded
    repository object to create a key file
    """
    def __init__(self, path):
        self.repository = attic.helpers.Location(path)


@pytest.fixture()
def attic_key_file(attic_repo, tmpdir, monkeypatch):
    """
    create an attic key file from the given repo, in the keys
    subdirectory of the given tmpdir

    :param attic_repo: an attic.repository.Repository object (fixture
    define above)
    :param tmpdir: a temporary directory (a builtin fixture)
    :returns: the KeyfileKey object as returned by
    attic.key.KeyfileKey.create()
    """
    keys_dir = str(tmpdir.mkdir('keys'))

    # we use the repo dir for the created keyfile, because we do
    # not want to clutter existing keyfiles
    monkeypatch.setenv('ATTIC_KEYS_DIR', keys_dir)

    # we use the same directory for the converted files, which
    # will clutter the previously created one, which we don't care
    # about anyways. in real runs, the original key will be retained.
    monkeypatch.setenv('BORG_KEYS_DIR', keys_dir)
    monkeypatch.setenv('ATTIC_PASSPHRASE', 'test')
    return attic.key.KeyfileKey.create(attic_repo,
                                       MockArgs(keys_dir))


@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_keys(tmpdir, attic_repo, attic_key_file):
    """test key conversion

    test that we can convert the given key to a properly formatted
    borg key. assumes that the ATTIC_KEYS_DIR and BORG_KEYS_DIR have
    been properly populated by the attic_key_file fixture.

    :param tmpdir: a temporary directory (a builtin fixture)
    :param attic_repo: an attic.repository.Repository object (fixture
    define above)
    :param attic_key_file: an attic.key.KeyfileKey (fixture created above)
    """
    with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
        keyfile = AtticKeyfileKey.find_key_file(repository)
        AtticRepositoryUpgrader.convert_keyfiles(keyfile, dryrun=False)
    assert key_valid(attic_key_file.path)


@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
    """test all conversion steps

    this runs everything. mostly redundant test, since everything is
    done above. yet we expect a NotImplementedError because we do not
    convert caches yet.

    :param tmpdir: a temporary directory (a builtin fixture)
    :param attic_repo: an attic.repository.Repository object (fixture
    define above)
    :param attic_key_file: an attic.key.KeyfileKey (fixture created above)
    """
    # check should fail because of magic number
    assert not repo_valid(tmpdir)

    def stat_segment(path):
        return os.stat(os.path.join(path, 'data', '0', '0'))

    def first_inode(path):
        return stat_segment(path).st_ino

    orig_inode = first_inode(attic_repo.path)
    with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
        # replicate command dispatch, partly
        os.umask(UMASK_DEFAULT)
        backup = repository.upgrade(dryrun=False, inplace=inplace)
        if inplace:
            assert backup is None
            assert first_inode(repository.path) == orig_inode
        else:
            assert backup
            assert first_inode(repository.path) != first_inode(backup)
            # i have seen cases where the copied tree has world-readable
            # permissions, which is wrong
            assert stat_segment(backup).st_mode & UMASK_DEFAULT == 0

    assert key_valid(attic_key_file.path)
    assert repo_valid(tmpdir)


def test_hardlink(tmpdir, inplace):
    """test that we handle hard links properly

    that is, if we are in "inplace" mode, hardlinks should *not*
    change (ie. we write to the file directly, so we do not rewrite the
    whole file, and we do not re-create the file).

    if we are *not* in inplace mode, then the inode should change, as
    we are supposed to leave the original inode alone."""
    a = str(tmpdir.join('a'))
    with open(a, 'wb') as tmp:
        tmp.write(b'aXXX')
    b = str(tmpdir.join('b'))
    os.link(a, b)
    AtticRepositoryUpgrader.header_replace(b, b'a', b'b', inplace=inplace)
    if not inplace:
        assert os.stat(a).st_ino != os.stat(b).st_ino
    else:
        assert os.stat(a).st_ino == os.stat(b).st_ino
    with open(b, 'rb') as tmp:
        assert tmp.read() == b'bXXX'