1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
from beaker.util import SyncDict
import gc, random, sys, time, weakref
# this script tests SyncDict for its thread safety,
# ability to always return a value even for a dictionary
# that loses data randomly, and
# insures that when used as a registry, only one instance
# of a particular key/value exists at any one time.
try:
import thread
except:
raise "this test requires a thread-enabled python"
jython = sys.platform.startswith('java')
def collect():
"""The tests assume CPython GC behavior in regard to weakrefs, but
we can coerce Jython into passing by triggering GC when we expect
it to have happened on CPython
"""
if jython:
gc.collect()
class item:
def __init__(self, id):
self.id = id
def __str__(self):
return "item id %d" % self.id
# keep running indicator
running = False
# one item is referenced at a time (to insure singleton pattern)
theitem = weakref.ref(item(0))
# creation func entrance detector to detect non-synchronized access
# to the create function
baton = None
def create(id):
global baton
if baton is not None:
raise "baton is not none !"
baton = True
try:
global theitem
collect()
if theitem() is not None:
raise "create %d old item is still referenced" % id
i = item(id)
theitem = weakref.ref(i)
global totalcreates
totalcreates += 1
return i
finally:
baton = None
def threadtest(s, id, statusdict):
print "create thread %d starting" % id
statusdict[id] = True
try:
global running
global totalgets
try:
while running:
s.get('test', lambda: create(id))
totalgets += 1
time.sleep(random.random() * .00001)
except:
e = sys.exc_info()[0]
running = False
print e
finally:
print "create thread %d exiting" % id
statusdict[id] = False
def runtest(s):
statusdict = {}
global totalcreates
totalcreates = 0
global totalremoves
totalremoves = 0
global totalgets
totalgets = 0
global running
running = True
for t in range(1, 10):
thread.start_new_thread(threadtest, (s, t, statusdict))
time.sleep(1)
for x in range (0,10):
if not running:
break
print "Removing item"
try:
del s['test']
totalremoves += 1
except KeyError:
pass
sleeptime = random.random() * .89
time.sleep(sleeptime)
failed = not running
running = False
pause = True
while pause:
time.sleep(1)
pause = False
for v in statusdict.values():
if v:
pause = True
break
if failed:
raise "test failed"
print "total object creates %d" % totalcreates
print "total object gets %d" % totalgets
print "total object removes %d" % totalremoves
def test_dict():
# normal dictionary test, where we will remove the value
# periodically. the number of creates should be equal to
# the number of removes plus one.
collect()
print "\ntesting with normal dict"
runtest(SyncDict(thread.allocate_lock(), {}))
assert(totalremoves + 1 == totalcreates)
def test_weakdict():
collect()
print "\ntesting with weak dict"
runtest(SyncDict(thread.allocate_lock(), weakref.WeakValueDictionary()))
|