File: test_encryption.py

package info (click to toggle)
zfs-autobackup 3.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 616 kB
  • sloc: python: 5,044; sh: 94; makefile: 3
file content (313 lines) | stat: -rw-r--r-- 23,386 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
from zfs_autobackup.CmdPipe import CmdPipe
from basetest import *
import time

# We have to do a LOT to properly test encryption/decryption/raw transfers
#
# For every scenario we need at least:
# - plain source dataset
# - encrypted source dataset
# - plain target path
# - encrypted target path
# - do a full transfer
# - do a incremental transfer

# Scenarios:
# - Raw transfer
# - Decryption transfer (--decrypt)
# - Encryption transfer (--encrypt)
# - Re-encryption transfer (--decrypt --encrypt)

class TestZfsEncryption(unittest2.TestCase):


    def setUp(self):
        prepare_zpools()

        try:
            shelltest("zfs get encryption test_source1")
        except:
            self.skipTest("Encryption not supported on this ZFS version.")

    def load_key(self, key, path):

        shelltest("rm /tmp/zfstest.key 2>/dev/null;true")
        shelltest("echo {} > /tmp/zfstest.key".format(key))
        shelltest("zfs load-key {}".format(path))

    def prepare_encrypted_dataset(self, key, path, unload_key=False):

        # create encrypted source dataset
        shelltest("rm /tmp/zfstest.key 2>/dev/null;true")
        shelltest("echo {} > /tmp/zfstest.key".format(key))
        shelltest("zfs create -o keylocation=file:///tmp/zfstest.key -o keyformat=passphrase -o encryption=on {}".format(path))

        if unload_key:
            shelltest("zfs unmount {}".format(path))
            shelltest("zfs unload-key {}".format(path))

        # r=shelltest("dd if=/dev/zero of=/test_source1/fs1/enc1/data.txt bs=200000 count=1")

    def  test_raw(self):
        """send encrypted data unaltered (standard operation)"""

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")
        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsourcekeyless", unload_key=True) # raw mode shouldn't need a key
        self.prepare_encrypted_dataset("22222222", "test_target1/encryptedtarget")

        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --allow-empty --exclude-received".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --no-snapshot --exclude-received".split(" ")).run())

        with mocktime("20101111000001"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --allow-empty --exclude-received".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --no-snapshot --exclude-received".split(" ")).run())

        r = shelltest("zfs get -r -t filesystem encryptionroot test_target1")
        self.assertMultiLineEqual(r,"""
NAME                                                                  PROPERTY        VALUE                                                                 SOURCE
test_target1                                                          encryptionroot  -                                                                     -
test_target1/encryptedtarget                                          encryptionroot  test_target1/encryptedtarget                                          -
test_target1/encryptedtarget/test_source1                             encryptionroot  test_target1/encryptedtarget                                          -
test_target1/encryptedtarget/test_source1/fs1                         encryptionroot  -                                                                     -
test_target1/encryptedtarget/test_source1/fs1/encryptedsource         encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsource         -
test_target1/encryptedtarget/test_source1/fs1/encryptedsourcekeyless  encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsourcekeyless  -
test_target1/encryptedtarget/test_source1/fs1/sub                     encryptionroot  -                                                                     -
test_target1/encryptedtarget/test_source2                             encryptionroot  test_target1/encryptedtarget                                          -
test_target1/encryptedtarget/test_source2/fs2                         encryptionroot  test_target1/encryptedtarget                                          -
test_target1/encryptedtarget/test_source2/fs2/sub                     encryptionroot  -                                                                     -
test_target1/test_source1                                             encryptionroot  -                                                                     -
test_target1/test_source1/fs1                                         encryptionroot  -                                                                     -
test_target1/test_source1/fs1/encryptedsource                         encryptionroot  test_target1/test_source1/fs1/encryptedsource                         -
test_target1/test_source1/fs1/encryptedsourcekeyless                  encryptionroot  test_target1/test_source1/fs1/encryptedsourcekeyless                  -
test_target1/test_source1/fs1/sub                                     encryptionroot  -                                                                     -
test_target1/test_source2                                             encryptionroot  -                                                                     -
test_target1/test_source2/fs2                                         encryptionroot  -                                                                     -
test_target1/test_source2/fs2/sub                                     encryptionroot  -                                                                     -
""")

    def  test_decrypt(self):
        """decrypt data and store unencrypted (--decrypt)"""

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")
        self.prepare_encrypted_dataset("22222222", "test_target1/encryptedtarget")

        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --decrypt --allow-empty --exclude-received".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --decrypt --no-snapshot --exclude-received".split(" ")).run())

        with mocktime("20101111000001"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --decrypt --allow-empty --exclude-received".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --decrypt --no-snapshot --exclude-received".split(" ")).run())

        r = shelltest("zfs get -r -t filesystem encryptionroot test_target1")
        self.assertEqual(r, """
NAME                                                           PROPERTY        VALUE                         SOURCE
test_target1                                                   encryptionroot  -                             -
test_target1/encryptedtarget                                   encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1                      encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1/fs1                  encryptionroot  -                             -
test_target1/encryptedtarget/test_source1/fs1/encryptedsource  encryptionroot  -                             -
test_target1/encryptedtarget/test_source1/fs1/sub              encryptionroot  -                             -
test_target1/encryptedtarget/test_source2                      encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source2/fs2                  encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source2/fs2/sub              encryptionroot  -                             -
test_target1/test_source1                                      encryptionroot  -                             -
test_target1/test_source1/fs1                                  encryptionroot  -                             -
test_target1/test_source1/fs1/encryptedsource                  encryptionroot  -                             -
test_target1/test_source1/fs1/sub                              encryptionroot  -                             -
test_target1/test_source2                                      encryptionroot  -                             -
test_target1/test_source2/fs2                                  encryptionroot  -                             -
test_target1/test_source2/fs2/sub                              encryptionroot  -                             -
""")

    def  test_encrypt(self):
        """send normal data set and store encrypted on the other side (--encrypt) issue #60 """

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")
        self.prepare_encrypted_dataset("22222222", "test_target1/encryptedtarget")

        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --encrypt --debug --allow-empty --exclude-received --clear-mountpoint".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --encrypt --debug --no-snapshot --exclude-received --clear-mountpoint".split(" ")).run())

        with mocktime("20101111000001"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --encrypt --debug --allow-empty --exclude-received --clear-mountpoint".split(" ")).run())
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --encrypt --debug --no-snapshot --exclude-received --clear-mountpoint".split(" ")).run())

        r = shelltest("zfs get -r -t filesystem encryptionroot test_target1")
        self.assertEqual(r, """
NAME                                                           PROPERTY        VALUE                                                          SOURCE
test_target1                                                   encryptionroot  -                                                              -
test_target1/encryptedtarget                                   encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source1                      encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source1/fs1                  encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source1/fs1/encryptedsource  encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsource  -
test_target1/encryptedtarget/test_source1/fs1/sub              encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source2                      encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source2/fs2                  encryptionroot  test_target1/encryptedtarget                                   -
test_target1/encryptedtarget/test_source2/fs2/sub              encryptionroot  test_target1/encryptedtarget                                   -
test_target1/test_source1                                      encryptionroot  -                                                              -
test_target1/test_source1/fs1                                  encryptionroot  -                                                              -
test_target1/test_source1/fs1/encryptedsource                  encryptionroot  test_target1/test_source1/fs1/encryptedsource                  -
test_target1/test_source1/fs1/sub                              encryptionroot  -                                                              -
test_target1/test_source2                                      encryptionroot  -                                                              -
test_target1/test_source2/fs2                                  encryptionroot  -                                                              -
test_target1/test_source2/fs2/sub                              encryptionroot  -                                                              -
""")

    def test_reencrypt(self):
        """reencrypt data (--decrypt --encrypt) """

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")
        self.prepare_encrypted_dataset("22222222", "test_target1/encryptedtarget")

        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup(
                "test test_target1 --verbose --no-progress --decrypt --encrypt --debug --allow-empty --exclude-received --clear-mountpoint".split(" ")).run())
            self.assertFalse(ZfsAutobackup(
                "test test_target1/encryptedtarget --verbose --no-progress --decrypt --encrypt --debug --no-snapshot --exclude-received --clear-mountpoint".split(
                    " ")).run())

        with mocktime("20101111000001"):
            self.assertFalse(ZfsAutobackup(
                "test test_target1 --verbose --no-progress --decrypt --encrypt --debug --allow-empty --exclude-received".split(" ")).run())
            self.assertFalse(ZfsAutobackup(
                "test test_target1/encryptedtarget --verbose --no-progress --decrypt --encrypt --debug --no-snapshot --exclude-received".split(
                    " ")).run())

        r = shelltest("zfs get -r -t filesystem encryptionroot test_target1")
        self.assertEqual(r, """
NAME                                                           PROPERTY        VALUE                         SOURCE
test_target1                                                   encryptionroot  -                             -
test_target1/encryptedtarget                                   encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1                      encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1/fs1                  encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1/fs1/encryptedsource  encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source1/fs1/sub              encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source2                      encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source2/fs2                  encryptionroot  test_target1/encryptedtarget  -
test_target1/encryptedtarget/test_source2/fs2/sub              encryptionroot  test_target1/encryptedtarget  -
test_target1/test_source1                                      encryptionroot  -                             -
test_target1/test_source1/fs1                                  encryptionroot  -                             -
test_target1/test_source1/fs1/encryptedsource                  encryptionroot  -                             -
test_target1/test_source1/fs1/sub                              encryptionroot  -                             -
test_target1/test_source2                                      encryptionroot  -                             -
test_target1/test_source2/fs2                                  encryptionroot  -                             -
test_target1/test_source2/fs2/sub                              encryptionroot  -                             -
""")




    def  test_raw_invalid_snapshot(self):
        """in raw mode, its not allowed to have any newer snaphots on target, #219"""

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")

        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress".split(" ")).run())

        #this is invalid in raw mode
        shelltest("zfs snapshot test_target1/test_source1/fs1/encryptedsource@incompatible")

        with mocktime("20101111000001"):
            #should fail because of incompatble snapshot
            self.assertEqual(ZfsAutobackup("test test_target1 --verbose --no-progress --allow-empty".split(" ")).run(),1)
            #should destroy incompatible and continue
            self.assertFalse(ZfsAutobackup("test test_target1 --verbose --no-progress --no-snapshot --destroy-incompatible".split(" ")).run())


        r = shelltest("zfs get -r -t filesystem encryptionroot test_target1")
        self.assertMultiLineEqual(r,"""
NAME                                           PROPERTY        VALUE                                          SOURCE
test_target1                                   encryptionroot  -                                              -
test_target1/test_source1                      encryptionroot  -                                              -
test_target1/test_source1/fs1                  encryptionroot  -                                              -
test_target1/test_source1/fs1/encryptedsource  encryptionroot  test_target1/test_source1/fs1/encryptedsource  -
test_target1/test_source1/fs1/sub              encryptionroot  -                                              -
test_target1/test_source2                      encryptionroot  -                                              -
test_target1/test_source2/fs2                  encryptionroot  -                                              -
test_target1/test_source2/fs2/sub              encryptionroot  -                                              -
""")


    def  test_resume_encrypt_with_no_key(self):
        """test what happens if target encryption key not loaded (this led to a kernel crash of freebsd with 2.1.x i think) while trying to resume"""

        self.prepare_encrypted_dataset("11111111", "test_source1/fs1/encryptedsource")
        self.prepare_encrypted_dataset("22222222", "test_target1/encryptedtarget")


        with mocktime("20101111000000"):
            self.assertFalse(ZfsAutobackup("test test_target1/encryptedtarget --verbose --no-progress --encrypt --allow-empty --exclude-received --clear-mountpoint".split(" ")).run())

        r = shelltest("zfs set compress=off test_source1 test_target1")

        # big change on source
        r = shelltest("dd if=/dev/zero of=/test_source1/fs1/data bs=250M count=1")

        # waste space on target
        r = shelltest("dd if=/dev/zero of=/test_target1/waste bs=250M count=1")

        # should fail and leave resume token
        with mocktime("20101111000001"):
            self.assertTrue(ZfsAutobackup(
                "test test_target1/encryptedtarget --verbose --no-progress --encrypt --exclude-received --allow-empty --clear-mountpoint".split(
                    " ")).run())
        #
        # free up space
        r = shelltest("rm /test_target1/waste")

        # sync
        r = shelltest("zfs umount test_target1")
        r = shelltest("zfs mount test_target1")

        #
        # #unload key
        shelltest("zfs unload-key test_target1/encryptedtarget")

        # resume should fail
        with mocktime("20101111000001"):
            self.assertEqual(ZfsAutobackup(
                "test test_target1/encryptedtarget --verbose --no-progress --encrypt --exclude-received --allow-empty --no-snapshot --clear-mountpoint".split(
                    " ")).run(),3)



#NOTE: On some versions this leaves 2 weird sub-datasets that should'nt be there (its probably a zfs bug?)
#so we ignore this, and just make sure the backup resumes correctly after reloading the key.
#         r = shelltest("zfs get -r -t all encryptionroot test_target1")
#         self.assertEqual(r, """
# NAME                                                                               PROPERTY        VALUE                                                          SOURCE
# test_target1                                                                       encryptionroot  -                                                              -
# test_target1/encryptedtarget                                                       encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1                                          encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1/fs1                                      encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1/fs1@test-20101111000000                  encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1/fs1/encryptedsource                      encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsource  -
# test_target1/encryptedtarget/test_source1/fs1/encryptedsource@test-20101111000000  encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsource  -
# test_target1/encryptedtarget/test_source1/fs1/encryptedsource@test-20101111000001  encryptionroot  test_target1/encryptedtarget/test_source1/fs1/encryptedsource  -
# test_target1/encryptedtarget/test_source1/fs1/sub                                  encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1/fs1/sub@test-20101111000000              encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source1/fs1/sub/sub                              encryptionroot  -                                                              -
# test_target1/encryptedtarget/test_source1/fs1/sub/sub@test-20101111000001          encryptionroot  -                                                              -
# test_target1/encryptedtarget/test_source2                                          encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source2/fs2                                      encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source2/fs2/sub                                  encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source2/fs2/sub@test-20101111000000              encryptionroot  test_target1/encryptedtarget                                   -
# test_target1/encryptedtarget/test_source2/fs2/sub/sub                              encryptionroot  -                                                              -
# test_target1/encryptedtarget/test_source2/fs2/sub/sub@test-20101111000001          encryptionroot  -                                                              -
# """)



        #reload key and resume correctly.
        self.load_key("22222222", "test_target1/encryptedtarget")

        # resume should complete
        with mocktime("20101111000001"):
            self.assertEqual(ZfsAutobackup(
                "test test_target1/encryptedtarget --verbose --no-progress --encrypt --exclude-received --allow-empty --no-snapshot --clear-mountpoint".split(
                    " ")).run(),0)