1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
from myghtyutils.container import *
import myghtyutils.buffer as buffer
import random, time, weakref, sys, re
import testbase
import unittest, sys
# container test -
# tests the container's get_value() function mostly, to insure
# that items are recreated when expired, and that create function
# is called exactly once per expiration
try:
import thread
except:
raise "this test requires a thread-enabled python"
class item:
def __init__(self, id):
self.id = id
def __str__(self):
return "item id %d" % self.id
def test_item(self):
return True
class context(ContainerContext):
pass
#def __init__(self):
# ContainerContext.__init__(self, buffer.LogFormatter(buffer.LinePrinter(sys.stdout), "test", id_threads = True))
# keep running indicator
running = False
starttime = time.time()
# creation func entrance detector to detect non-synchronized access
# to the create function
baton = None
context = context()
def create(id, delay = 0):
global baton
if baton is not None:
raise "baton is not none , ident " + repr(baton) + " this thread " + repr(thread.get_ident())
baton = thread.get_ident()
try:
i = item(id)
time.sleep(delay)
global totalcreates
totalcreates += 1
return i
finally:
baton = None
def test(cclass, id, statusdict, expiretime, delay, params):
print "create thread %d starting" % id
statusdict[id] = True
try:
container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)
global running
global totalgets
try:
while running:
item = container.get_value()
if not item.test_item():
raise "item did not test"
item = None
totalgets += 1
time.sleep(random.random() * .00001)
except:
e = sys.exc_info()[0]
running = False
print e
raise
finally:
print "create thread %d exiting" % id
statusdict[id] = False
def runtest(cclass, totaltime, expiretime, delay, **params):
statusdict = {}
global totalcreates
totalcreates = 0
global totalgets
totalgets = 0
container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)
container.clear_value()
global running
running = True
for t in range(1, 20):
thread.start_new_thread(test, (cclass, t, statusdict, expiretime, delay, params))
time.sleep(totaltime)
failed = not running
running = False
pause = True
while pause:
time.sleep(1)
pause = False
for v in statusdict.values():
if v:
pause = True
break
if failed:
raise "test failed"
print "total object creates %d" % totalcreates
print "total object gets %d" % totalgets
class ContainerTest(testbase.MyghtyTest):
def _runtest(self, cclass, totaltime, expiretime, delay, **params):
print "\ntesting %s for %d secs with expiretime %s delay %d" % (
cclass, totaltime, expiretime, delay)
runtest(cclass, totaltime, expiretime, delay, **params)
if expiretime is None:
self.assert_(totalcreates == 1)
else:
self.assert_(abs(totaltime / expiretime - totalcreates) <= 2)
def testMemoryContainer(self, totaltime=10, expiretime=None, delay=0):
self._runtest(container_registry('memory', 'Container'),
totaltime, expiretime, delay)
def testMemoryContainer2(self):
self.testMemoryContainer(expiretime=2)
def testMemoryContainer3(self):
self.testMemoryContainer(expiretime=5, delay=2)
def testDbmContainer(self, totaltime=10, expiretime=None, delay=0):
self._runtest(container_registry('dbm', 'Container'),
totaltime, expiretime, delay)
def testDbmContainer2(self):
self.testDbmContainer(expiretime=2)
def testDbmContainer3(self):
self.testDbmContainer(expiretime=5, delay=2)
if __name__ == "__main__":
testbase.runTests(unittest.findTestCases(__import__('__main__')))
|