File: t4_adm.py

package info (click to toggle)
s3ql 2.21%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 2,776 kB
  • ctags: 2,427
  • sloc: python: 14,869; makefile: 128; sh: 33; ansic: 22
file content (97 lines) | stat: -rwxr-xr-x 3,398 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
'''
t4_adm.py - this file is part of S3QL.

Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org>

This work can be distributed under the terms of the GNU GPLv3.
'''

if __name__ == '__main__':
    import pytest
    import sys
    sys.exit(pytest.main([__file__] + sys.argv[1:]))

from s3ql.backends import local
from s3ql.backends.comprenc import ComprencBackend
import shutil
import tempfile
import unittest
import subprocess
import pytest
import os

@pytest.mark.usefixtures('s3ql_cmd_argv', 'pass_reg_output')
class AdmTests(unittest.TestCase):

    def setUp(self):
        self.cache_dir = tempfile.mkdtemp(prefix='s3ql-cache-')
        self.backend_dir = tempfile.mkdtemp(prefix='s3ql-backend-')

        self.storage_url = 'local://' + self.backend_dir
        self.passphrase = 'oeut3d'

    def tearDown(self):
        shutil.rmtree(self.cache_dir)
        shutil.rmtree(self.backend_dir)

    def mkfs(self):
        proc = subprocess.Popen(self.s3ql_cmd_argv('mkfs.s3ql') +
                                ['-L', 'test fs', '--max-obj-size', '500',
                                 '--authfile', '/dev/null', '--cachedir', self.cache_dir,
                                 '--quiet', self.storage_url ],
                                stdin=subprocess.PIPE, universal_newlines=True)

        print(self.passphrase, file=proc.stdin)
        print(self.passphrase, file=proc.stdin)
        proc.stdin.close()

        self.assertEqual(proc.wait(), 0)
        self.reg_output(r'^WARNING: Maximum object sizes less than '
                        '1 MiB will degrade performance\.$', count=1)

    @unittest.skipUnless(os.path.exists('/proc/mounts'), '/proc/mounts not available')
    def test_passphrase(self):
        self.mkfs()

        passphrase_new = 'sd982jhd'

        proc = subprocess.Popen(self.s3ql_cmd_argv('s3qladm') +
                                [ '--quiet', '--log', 'none', '--authfile',
                                  '/dev/null', 'passphrase', self.storage_url ],
                                stdin=subprocess.PIPE, universal_newlines=True)

        print(self.passphrase, file=proc.stdin)
        print(passphrase_new, file=proc.stdin)
        print(passphrase_new, file=proc.stdin)
        proc.stdin.close()

        self.assertEqual(proc.wait(), 0)

        plain_backend = local.Backend(self.storage_url, None, None)
        backend = ComprencBackend(passphrase_new.encode(), ('zlib', 6), plain_backend)

        backend.fetch('s3ql_passphrase') # will fail with wrong pw


    def test_authinfo(self):
        self.mkfs()

        with tempfile.NamedTemporaryFile('wt') as fh:
            print('[entry1]',
                  'storage-url: local://',
                  'fs-passphrase: clearly wrong',
                  '',
                  '[entry2]',
                  'storage-url: %s' % self.storage_url,
                  'fs-passphrase: %s' % self.passphrase,
                  file=fh, sep='\n')
            fh.flush()

            proc = subprocess.Popen(self.s3ql_cmd_argv('fsck.s3ql') +
                                    [ '--quiet', '--authfile', fh.name,
                                      '--cachedir', self.cache_dir, '--log', 'none', self.storage_url ],
                                    stdin=subprocess.PIPE, universal_newlines=True)

            proc.stdin.close()
            self.assertEqual(proc.wait(), 0)