1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
import time
import pytest
from ..locking import get_id, TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout
ID1 = "foo", 1, 1
ID2 = "bar", 2, 2
def test_id():
hostname, pid, tid = get_id()
assert isinstance(hostname, str)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert len(hostname) > 0
assert pid > 0
class TestTimeoutTimer:
def test_timeout(self):
timeout = 0.5
t = TimeoutTimer(timeout).start()
assert not t.timed_out()
time.sleep(timeout * 1.5)
assert t.timed_out()
def test_notimeout_sleep(self):
timeout, sleep = None, 0.5
t = TimeoutTimer(timeout, sleep).start()
assert not t.timed_out_or_sleep()
assert time.time() >= t.start_time + 1 * sleep
assert not t.timed_out_or_sleep()
assert time.time() >= t.start_time + 2 * sleep
@pytest.fixture()
def lockpath(tmpdir):
return str(tmpdir.join('lock'))
class TestExclusiveLock:
def test_checks(self, lockpath):
with ExclusiveLock(lockpath, timeout=1) as lock:
assert lock.is_locked() and lock.by_me()
def test_acquire_break_reacquire(self, lockpath):
lock = ExclusiveLock(lockpath, id=ID1).acquire()
lock.break_lock()
with ExclusiveLock(lockpath, id=ID2):
pass
def test_timeout(self, lockpath):
with ExclusiveLock(lockpath, id=ID1):
with pytest.raises(LockTimeout):
ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
class TestLock:
def test_shared(self, lockpath):
lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire()
lock2 = Lock(lockpath, exclusive=False, id=ID2).acquire()
assert len(lock1._roster.get(SHARED)) == 2
assert len(lock1._roster.get(EXCLUSIVE)) == 0
assert not lock1._roster.empty(SHARED, EXCLUSIVE)
assert lock1._roster.empty(EXCLUSIVE)
lock1.release()
lock2.release()
def test_exclusive(self, lockpath):
with Lock(lockpath, exclusive=True, id=ID1) as lock:
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
assert not lock._roster.empty(SHARED, EXCLUSIVE)
def test_upgrade(self, lockpath):
with Lock(lockpath, exclusive=False) as lock:
lock.upgrade()
lock.upgrade() # NOP
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
assert not lock._roster.empty(SHARED, EXCLUSIVE)
def test_downgrade(self, lockpath):
with Lock(lockpath, exclusive=True) as lock:
lock.downgrade()
lock.downgrade() # NOP
assert len(lock._roster.get(SHARED)) == 1
assert len(lock._roster.get(EXCLUSIVE)) == 0
def test_got_exclusive_lock(self, lockpath):
lock = Lock(lockpath, exclusive=True, id=ID1)
assert not lock.got_exclusive_lock()
lock.acquire()
assert lock.got_exclusive_lock()
lock.release()
assert not lock.got_exclusive_lock()
def test_break(self, lockpath):
lock = Lock(lockpath, exclusive=True, id=ID1).acquire()
lock.break_lock()
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 0
with Lock(lockpath, exclusive=True, id=ID2):
pass
def test_timeout(self, lockpath):
with Lock(lockpath, exclusive=False, id=ID1):
with pytest.raises(LockTimeout):
Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
with Lock(lockpath, exclusive=True, id=ID1):
with pytest.raises(LockTimeout):
Lock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire()
with Lock(lockpath, exclusive=True, id=ID1):
with pytest.raises(LockTimeout):
Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
@pytest.fixture()
def rosterpath(tmpdir):
return str(tmpdir.join('roster'))
class TestLockRoster:
def test_empty(self, rosterpath):
roster = LockRoster(rosterpath)
empty = roster.load()
roster.save(empty)
assert empty == {}
def test_modify_get(self, rosterpath):
roster1 = LockRoster(rosterpath, id=ID1)
assert roster1.get(SHARED) == set()
roster1.modify(SHARED, ADD)
assert roster1.get(SHARED) == {ID1, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, ADD)
assert roster2.get(SHARED) == {ID1, ID2, }
roster1 = LockRoster(rosterpath, id=ID1)
roster1.modify(SHARED, REMOVE)
assert roster1.get(SHARED) == {ID2, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, REMOVE)
assert roster2.get(SHARED) == set()
|