File: location_lock_test.py

package info (click to toggle)
rdiff-backup 2.2.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,640 kB
  • sloc: python: 24,129; javascript: 9,512; sh: 1,230; ansic: 580; makefile: 36
file content (94 lines) | stat: -rw-r--r-- 3,542 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
Test the handling of locking with api version >= 201
"""
import os
import shutil
import unittest

from rdiff_backup import Globals, rpath
from rdiffbackup.locations import _repo_shadow

import commontest as comtst


class LocationLockTest(unittest.TestCase):
    """
    Test that rdiff-backup properly handles locking
    """

    def setUp(self):
        self.base_dir = os.path.join(comtst.abs_test_dir, b"location_lock")
        self.base_rp = rpath.RPath(Globals.local_connection, self.base_dir)
        self.lockfile = self.base_rp.append("lock")
        comtst.re_init_rpath_dir(self.base_rp)
        self.success = False

    def test_location_lock_exclusive(self):
        """
        verify that exclusive locking mechanisms do work properly
        """
        self.assertFalse(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=True))
        self.assertTrue(
            _repo_shadow.RepoShadow.lock(self.lockfile, exclusive=True))
        # we tweak another process
        fd1 = _repo_shadow.RepoShadow._lockfd
        _repo_shadow.RepoShadow._lockfd = None
        self.assertTrue(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=False))
        self.assertTrue(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=True))
        self.assertFalse(
            _repo_shadow.RepoShadow.lock(self.lockfile, exclusive=False))
        self.assertIsNone(_repo_shadow.RepoShadow._lockfd)
        _repo_shadow.RepoShadow._lockfd = fd1
        self.assertIsNone(
            _repo_shadow.RepoShadow.unlock(self.lockfile, exclusive=True))
        self.assertIsNone(_repo_shadow.RepoShadow._lockfd)
        self.assertFalse(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=False))
        self.success = True

    def test_location_lock_shared(self):
        """
        verify that shared locking (read-only) is possible
        """
        # make sure the lockfile doesn't exist
        self.lockfile.setdata()
        if self.lockfile.lstat():
            self.lockfile.delete()
        self.assertFalse(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=False))
        self.lockfile.touch()
        self.assertTrue(
            _repo_shadow.RepoShadow.lock(self.lockfile, exclusive=False))
        # we tweak another process
        fd1 = _repo_shadow.RepoShadow._lockfd
        _repo_shadow.RepoShadow._lockfd = None
        self.assertFalse(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=False))
        self.assertTrue(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=True))
        self.assertFalse(
            _repo_shadow.RepoShadow.lock(self.lockfile, exclusive=True))
        self.assertIsNone(_repo_shadow.RepoShadow._lockfd)
        self.assertTrue(
            _repo_shadow.RepoShadow.lock(self.lockfile, exclusive=False))
        self.assertIsNone(
            _repo_shadow.RepoShadow.unlock(self.lockfile, exclusive=False))
        _repo_shadow.RepoShadow._lockfd = fd1
        self.assertIsNone(
            _repo_shadow.RepoShadow.unlock(self.lockfile, exclusive=False))
        self.assertIsNone(_repo_shadow.RepoShadow._lockfd)
        self.assertFalse(
            _repo_shadow.RepoShadow.is_locked(self.lockfile, exclusive=False))
        self.success = True

    def tearDown(self):
        # we clean-up only if the test was successful
        if self.success:
            shutil.rmtree(self.base_dir)


if __name__ == "__main__":
    unittest.main()