File: test_container.py

package info (click to toggle)
beaker 1.6.3-1.1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 424 kB
  • sloc: python: 3,678; makefile: 45
file content (181 lines) | stat: -rw-r--r-- 5,377 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import random
import time
from beaker.container import *
from beaker.synchronization import _synchronizers
from beaker.cache import clsmap
from threading import Thread

class CachedWidget(object):
    totalcreates = 0
    delay = 0

    def __init__(self):
        CachedWidget.totalcreates += 1
        time.sleep(CachedWidget.delay)
        self.time = time.time()

def _run_container_test(cls, totaltime, expiretime, delay, threadlocal):
    print "\ntesting %s for %d secs with expiretime %s delay %d" % (
        cls, totaltime, expiretime, delay)

    CachedWidget.totalcreates = 0
    CachedWidget.delay = delay

    # allow for python overhead when checking current time against expire times
    fudge = 1

    starttime = time.time()

    running = [True]
    class RunThread(Thread):
        def run(self):
            print "%s starting" % self

            if threadlocal:
                localvalue = Value(
                                'test', 
                                cls('test', data_dir='./cache'), 
                                createfunc=CachedWidget, 
                                expiretime=expiretime, 
                                starttime=starttime)
                localvalue.clear_value()
            else:
                localvalue = value

            try:
                while running[0]:
                    item = localvalue.get_value()
                    if expiretime is not None:
                        currenttime = time.time()
                        itemtime = item.time
                        assert itemtime + expiretime + delay + fudge >= currenttime, \
                            "created: %f expire: %f delay: %f currenttime: %f" % \
                            (itemtime, expiretime, delay, currenttime)
                    time.sleep(random.random() * .00001)
            except:
                running[0] = False
                raise
            print "%s finishing" % self

    if not threadlocal:
        value = Value(
                    'test', 
                    cls('test', data_dir='./cache'), 
                    createfunc=CachedWidget, 
                    expiretime=expiretime, 
                    starttime=starttime)
        value.clear_value()
    else:
        value = None

    threads = [RunThread() for i in range(1, 8)]

    for t in threads:
        t.start()

    time.sleep(totaltime)

    failed = not running[0]
    running[0] = False

    for t in threads:
        t.join()

    assert not failed, "One or more threads failed"
    if expiretime is None:
        expected = 1
    else:
        expected = totaltime / expiretime + 1
    assert CachedWidget.totalcreates <= expected, \
            "Number of creates %d exceeds expected max %d" % (CachedWidget.totalcreates, expected)

def test_memory_container(totaltime=10, expiretime=None, delay=0, threadlocal=False):
    _run_container_test(clsmap['memory'],
                  totaltime, expiretime, delay, threadlocal)

def test_dbm_container(totaltime=10, expiretime=None, delay=0):
    _run_container_test(clsmap['dbm'], totaltime, expiretime, delay, False)

def test_file_container(totaltime=10, expiretime=None, delay=0, threadlocal=False):
    _run_container_test(clsmap['file'], totaltime, expiretime, delay, threadlocal)

def test_memory_container_tlocal():
    test_memory_container(expiretime=5, delay=2, threadlocal=True)

def test_memory_container_2():
    test_memory_container(expiretime=2)

def test_memory_container_3():
    test_memory_container(expiretime=5, delay=2)

def test_dbm_container_2():
    test_dbm_container(expiretime=2)

def test_dbm_container_3():
    test_dbm_container(expiretime=5, delay=2)

def test_file_container_2():
    test_file_container(expiretime=2)

def test_file_container_3():
    test_file_container(expiretime=5, delay=2)

def test_file_container_tlocal():
    test_file_container(expiretime=5, delay=2, threadlocal=True)

def test_file_open_bug():
    """ensure errors raised during reads or writes don't lock the namespace open."""

    value = Value('test', clsmap['file']('reentrant_test', data_dir='./cache'))
    if os.path.exists(value.namespace.file):
        os.remove(value.namespace.file)

    value.set_value("x")

    f = open(value.namespace.file, 'w')
    f.write("BLAH BLAH BLAH")
    f.close()

    # TODO: do we have an assertRaises() in nose to use here ?
    try:
        value.set_value("y")
        assert False
    except:
        pass

    _synchronizers.clear()

    value = Value('test', clsmap['file']('reentrant_test', data_dir='./cache'))

    # TODO: do we have an assertRaises() in nose to use here ?
    try:
        value.set_value("z")
        assert False
    except:
        pass


def test_removing_file_refreshes():
    """test that the cache doesn't ignore file removals"""

    x = [0]
    def create():
        x[0] += 1
        return x[0]

    value = Value('test', 
                    clsmap['file']('refresh_test', data_dir='./cache'), 
                    createfunc=create, starttime=time.time()
                    )
    if os.path.exists(value.namespace.file):
        os.remove(value.namespace.file)
    assert value.get_value() == 1
    assert value.get_value() == 1
    os.remove(value.namespace.file)
    assert value.get_value() == 2


def teardown():
    import shutil
    shutil.rmtree('./cache', True)