1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
import os
import random
import time
import sys
import weakref
import test_base
from beaker.container import *
from beaker.synchronization import Synchronizer
# container test -
# tests the container's get_value() function mostly, to insure
# that items are recreated when expired, and that create function
# is called exactly once per expiration
try:
import thread
except:
raise "this test requires a thread-enabled python"
#import logging
#logging.basicConfig()
#logging.getLogger('beaker.container').setLevel(logging.DEBUG)
class Item(object):
def __init__(self, id):
self.id = id
def __str__(self):
return "item id %d" % self.id
def test_item(self):
return True
# keep running indicator
running = False
starttime = time.time()
# creation func entrance detector to detect non-synchronized access
# to the create function
baton = None
context = ContainerContext()
def create(id, delay = 0):
global baton
if baton is not None:
raise "baton is not none , ident " + repr(baton) + " this thread " + repr(thread.get_ident())
baton = thread.get_ident()
try:
i = Item(id)
time.sleep(delay)
global totalcreates
totalcreates += 1
return i
finally:
baton = None
def threadtest(cclass, id, statusdict, expiretime, delay, threadlocal):
print "create thread %d starting" % id
statusdict[id] = True
try:
if threadlocal:
container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime)
else:
container = global_container
global running
global totalgets
try:
while running:
item = container.get_value()
if not item.test_item():
raise "item did not test"
item = None
totalgets += 1
time.sleep(random.random() * .00001)
except:
e = sys.exc_info()[0]
running = False
print e
raise
finally:
print "create thread %d exiting" % id
statusdict[id] = False
def runtest(cclass, totaltime, expiretime, delay, threadlocal):
statusdict = {}
global totalcreates
totalcreates = 0
global totalgets
totalgets = 0
global global_container
global_container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime)
global_container.clear_value()
global running
running = True
for t in range(1, 20):
thread.start_new_thread(threadtest, (cclass, t, statusdict, expiretime, delay, threadlocal))
time.sleep(totaltime)
failed = not running
running = False
pause = True
while pause:
time.sleep(1)
pause = False
for v in statusdict.values():
if v:
pause = True
break
if failed:
raise "test failed"
print "total object creates %d" % totalcreates
print "total object gets %d" % totalgets
class ContainerTest(test_base.MyghtyTest):
def _runtest(self, cclass, totaltime, expiretime, delay):
print "\ntesting %s for %d secs with expiretime %s delay %d" % (
cclass, totaltime, expiretime, delay)
runtest(cclass, totaltime, expiretime, delay, threadlocal=False)
if expiretime is None:
self.assert_(totalcreates == 1)
else:
self.assert_(abs(totaltime / expiretime - totalcreates) <= 2)
def testMemoryContainer(self, totaltime=10, expiretime=None, delay=0):
self._runtest(container_registry('memory'),
totaltime, expiretime, delay)
def testMemoryContainer2(self):
self.testMemoryContainer(expiretime=2)
def testMemoryContainer3(self):
self.testMemoryContainer(expiretime=5, delay=2)
def testDbmContainer(self, totaltime=10, expiretime=None, delay=0):
self._runtest(container_registry('dbm'),
totaltime, expiretime, delay)
def testDbmContainer2(self):
self.testDbmContainer(expiretime=2)
def testDbmContainer3(self):
self.testDbmContainer(expiretime=5, delay=2)
def test_file_open_bug(self):
# 1. create container
container = container_registry('file')(context=context, namespace='reentrant_test', key='test', data_dir='./cache')
# 2. ensure its file doesnt exist.
try:
os.remove(container.namespacemanager.file)
except OSError:
pass
# 3. set a value.
container.set_value("x")
# 4. open the file and corrupt its pickled data
f = open(container.namespacemanager.file, 'w')
f.write("BLAH BLAH BLAH")
f.close()
# 5. set another value. namespace.acquire_lock() opens the file, the pickle raises an exception, the lock stays open (that was the bug)
# comment out line 147 of container.py, self.do_release_write_lock() inside of acquire_write_lock(), to illustrate
try:
container.set_value("y")
assert False
except:
pass
# 6. clear file synchronziers, rebuild the namespace / context etc. so a new Synchchronizer gets built (alternatively, just
# access the same container from a different thread)
Synchronizer.conditions.clear()
context.clear()
container = container_registry('file')(context=context, namespace='reentrant_test', key='test', data_dir='./cache')
# 7. acquire write lock hangs !
try:
container.set_value("z")
assert False
except:
pass
|