File: test_synchronizer.py

package info (click to toggle)
beaker 1.6.3-1.1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 424 kB
  • sloc: python: 3,678; makefile: 45
file content (27 lines) | stat: -rw-r--r-- 653 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from beaker.synchronization import *

# TODO: spawn threads, test locking.


def teardown():
    import shutil
    shutil.rmtree('./cache', True)

def test_reentrant_file():
    sync1 = file_synchronizer('test', lock_dir='./cache')
    sync2 = file_synchronizer('test', lock_dir='./cache')
    sync1.acquire_write_lock()
    sync2.acquire_write_lock()
    sync2.release_write_lock()
    sync1.release_write_lock()

def test_null():
    sync = null_synchronizer()
    assert sync.acquire_write_lock()
    sync.release_write_lock()

def test_mutex():
    sync = mutex_synchronizer('someident')
    sync.acquire_write_lock()
    sync.release_write_lock()