1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
"""
TestCases for testing the locking sub-system.
"""
import sys, os, string
import tempfile
import time
from pprint import pprint
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from test_all import verbose
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db
except ImportError:
# For Python 2.3
from bsddb import db
#----------------------------------------------------------------------
class LockingTestCase(unittest.TestCase):
def setUp(self):
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_CREATE)
def tearDown(self):
self.env.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_simple(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test01_simple..." % self.__class__.__name__
anID = self.env.lock_id()
if verbose:
print "locker ID: %s" % anID
lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
if verbose:
print "Aquired lock: %s" % lock
time.sleep(1)
self.env.lock_put(lock)
if verbose:
print "Released lock: %s" % lock
def test02_threaded(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test02_threaded..." % self.__class__.__name__
threads = []
threads.append(Thread(target = self.theThread,
args=(5, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
for t in threads:
t.start()
for t in threads:
t.join()
def test03_set_timeout(self):
# test that the set_timeout call works
if hasattr(self.env, 'set_timeout'):
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
def theThread(self, sleepTime, lockType):
name = currentThread().getName()
if lockType == db.DB_LOCK_WRITE:
lt = "write"
else:
lt = "read"
anID = self.env.lock_id()
if verbose:
print "%s: locker ID: %s" % (name, anID)
lock = self.env.lock_get(anID, "some locked thing", lockType)
if verbose:
print "%s: Aquired %s lock: %s" % (name, lt, lock)
time.sleep(sleepTime)
self.env.lock_put(lock)
if verbose:
print "%s: Released %s lock: %s" % (name, lt, lock)
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
if have_threads:
suite.addTest(unittest.makeSuite(LockingTestCase))
else:
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|