File: Container.py

package info (click to toggle)
myghtyutils 0.52-4
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, squeeze, wheezy
  • size: 180 kB
  • ctags: 394
  • sloc: python: 1,761; makefile: 47
file content (167 lines) | stat: -rw-r--r-- 4,564 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from myghtyutils.container import *
import myghtyutils.buffer as buffer
import random, time, weakref, sys, re
import testbase
import unittest, sys


# container test -
# tests the container's get_value() function mostly, to insure
# that items are recreated when expired, and that create function
# is called exactly once per expiration

try:
    import thread
except:
    raise "this test requires a thread-enabled python"
    

class item:
    
    def __init__(self, id):
        self.id = id
        
    def __str__(self):
        return "item id %d" % self.id

    def test_item(self):
        return True

class context(ContainerContext):
    pass
    #def __init__(self):
    #    ContainerContext.__init__(self, buffer.LogFormatter(buffer.LinePrinter(sys.stdout), "test", id_threads = True))

# keep running indicator
running = False

starttime = time.time()

# creation func entrance detector to detect non-synchronized access
# to the create function
baton = None

context = context()

def create(id, delay = 0):
    global baton
    if baton is not None:
        raise "baton is not none , ident " + repr(baton) + " this thread " + repr(thread.get_ident())

    baton = thread.get_ident()
    try:    
        
        i = item(id)

        time.sleep(delay)
        global totalcreates
        totalcreates += 1

        return i
    finally:
        baton = None
        
def test(cclass, id, statusdict, expiretime, delay, params):
    print "create thread %d starting" % id
    statusdict[id] = True


    try:
        container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)

        global running
        global totalgets
        try:
            while running:
                item = container.get_value()
                if not item.test_item():
                    raise "item did not test"
                item = None
                totalgets += 1
                time.sleep(random.random() * .00001)
        except:
            
            e = sys.exc_info()[0]
            running = False
            print e
            raise
    finally:
        print "create thread %d exiting" % id
        statusdict[id] = False
    

def runtest(cclass, totaltime, expiretime, delay, **params):

    statusdict = {}
    global totalcreates
    totalcreates = 0

    global totalgets
    totalgets = 0

    container = cclass(context = context, namespace = 'test', key = 'test', createfunc = lambda: create(id, delay), expiretime = expiretime, data_dir='./cache', starttime = starttime, **params)
    container.clear_value()

    global running
    running = True    
    for t in range(1, 20):
        thread.start_new_thread(test, (cclass, t, statusdict, expiretime, delay, params))
        
    time.sleep(totaltime)
    
    failed = not running

    running = False

    pause = True
    while pause:
        time.sleep(1)    
        pause = False
        for v in statusdict.values():
            if v:
                pause = True
                break

    if failed:
        raise "test failed"

    print "total object creates %d" % totalcreates
    print "total object gets %d" % totalgets

class ContainerTest(testbase.MyghtyTest):
    def _runtest(self, cclass, totaltime, expiretime, delay, **params):
        print "\ntesting %s for %d secs with expiretime %s delay %d" % (
            cclass, totaltime, expiretime, delay)
        
        runtest(cclass, totaltime, expiretime, delay, **params)

        if expiretime is None:
            self.assert_(totalcreates == 1)
        else:
            self.assert_(abs(totaltime / expiretime - totalcreates) <= 2)

    def testMemoryContainer(self, totaltime=10, expiretime=None, delay=0):
        self._runtest(container_registry('memory', 'Container'),
                      totaltime, expiretime, delay)

    def testMemoryContainer2(self):
        self.testMemoryContainer(expiretime=2)

    def testMemoryContainer3(self):
        self.testMemoryContainer(expiretime=5, delay=2)

    def testDbmContainer(self, totaltime=10, expiretime=None, delay=0):
        self._runtest(container_registry('dbm', 'Container'),
                      totaltime, expiretime, delay)
        
    def testDbmContainer2(self):
        self.testDbmContainer(expiretime=2)

    def testDbmContainer3(self):
        self.testDbmContainer(expiretime=5, delay=2)

if __name__ == "__main__":
    testbase.runTests(unittest.findTestCases(__import__('__main__')))