File: test_syncdict.py

package info (click to toggle)
beaker 0.9.5-1
  • links: PTS, VCS
  • area: main
  • in suites: lenny
  • size: 384 kB
  • ctags: 548
  • sloc: python: 3,016; makefile: 40
file content (156 lines) | stat: -rw-r--r-- 3,602 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from beaker.util import SyncDict
import gc, random, sys, time, weakref

# this script tests SyncDict for its thread safety, 
# ability to always return a value even for a dictionary 
# that loses data randomly,  and
# insures that when used as a registry, only one instance
# of a particular key/value exists at any one time.

try:
    import thread
except:
    raise "this test requires a thread-enabled python"
    
jython = sys.platform.startswith('java')

def collect():
    """The tests assume CPython GC behavior in regard to weakrefs, but
    we can coerce Jython into passing by triggering GC when we expect
    it to have happened on CPython
    """
    if jython:
        gc.collect()


class item:
    
    def __init__(self, id):
        self.id = id
        
    def __str__(self):
        return "item id %d" % self.id

# keep running indicator
running = False

# one item is referenced at a time (to insure singleton pattern)
theitem = weakref.ref(item(0))

# creation func entrance detector to detect non-synchronized access
# to the create function
baton = None


def create(id):
    global baton
    if baton is not None:
        raise "baton is not none !"

    baton = True
    try:    
        global theitem
        collect()
        
        if theitem() is not None:
            raise "create %d old item is still referenced" % id
            
        i = item(id)
        theitem = weakref.ref(i)

        global totalcreates
        totalcreates += 1

        return i
    finally:
        baton = None
        
def threadtest(s, id, statusdict):
    print "create thread %d starting" % id
    statusdict[id] = True

    try:
        global running
        global totalgets
        try:
            while running:
                s.get('test', lambda: create(id))
                totalgets += 1
                time.sleep(random.random() * .00001)
        except:
            e = sys.exc_info()[0]
            running = False
            print e
    finally:
        print "create thread %d exiting" % id
        statusdict[id] = False
    


def runtest(s):

    statusdict = {}
    global totalcreates
    totalcreates = 0

    global totalremoves
    totalremoves = 0

    global totalgets
    totalgets = 0

    global running
    running = True    
    for t in range(1, 10):
        thread.start_new_thread(threadtest, (s, t, statusdict))
        
    time.sleep(1)
    
    for x in range (0,10):
        if not running:
            break
        print "Removing item"
        try: 
            del s['test']
            totalremoves += 1
        except KeyError:
            pass
        sleeptime = random.random() * .89
        time.sleep(sleeptime)

    failed = not running

    running = False

    pause = True
    while pause:
        time.sleep(1)    
        pause = False
        for v in statusdict.values():
            if v:
                pause = True
                break

    if failed:
        raise "test failed"

    print "total object creates %d" % totalcreates
    print "total object gets %d" % totalgets
    print "total object removes %d" % totalremoves

    
def test_dict():
    # normal dictionary test, where we will remove the value
    # periodically. the number of creates should be equal to
    # the number of removes plus one.    
    collect()
    print "\ntesting with normal dict"
    runtest(SyncDict(thread.allocate_lock(), {}))

    assert(totalremoves + 1 == totalcreates)


def test_weakdict():
    collect()
    print "\ntesting with weak dict"
    runtest(SyncDict(thread.allocate_lock(), weakref.WeakValueDictionary()))