1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
# coding: utf-8
from beaker.cache import clsmap, Cache, util
from beaker.exceptions import InvalidCacheBackendError
from beaker.middleware import CacheMiddleware
from nose import SkipTest
try:
from webtest import TestApp
except ImportError:
TestApp = None
try:
clsmap['ext:database']._init_dependencies()
except InvalidCacheBackendError:
raise SkipTest("an appropriate SQLAlchemy backend is not installed")
db_url = 'sqlite:///test.db'
def simple_app(environ, start_response):
extra_args = {}
clear = False
if environ.get('beaker.clear'):
clear = True
extra_args['type'] = 'ext:database'
extra_args['url'] = db_url
extra_args['data_dir'] = './cache'
cache = environ['beaker.cache'].get_cache('testcache', **extra_args)
if clear:
cache.clear()
try:
value = cache.get_value('value')
except:
value = 0
cache.set_value('value', value+1)
start_response('200 OK', [('Content-type', 'text/plain')])
return ['The current value is: %s' % cache.get_value('value')]
def cache_manager_app(environ, start_response):
cm = environ['beaker.cache']
cm.get_cache('test')['test_key'] = 'test value'
start_response('200 OK', [('Content-type', 'text/plain')])
yield "test_key is: %s\n" % cm.get_cache('test')['test_key']
cm.get_cache('test').clear()
try:
test_value = cm.get_cache('test')['test_key']
except KeyError:
yield "test_key cleared"
else:
yield "test_key wasn't cleared, is: %s\n" % \
cm.get_cache('test')['test_key']
def test_has_key():
cache = Cache('test', data_dir='./cache', url=db_url, type='ext:database')
o = object()
cache.set_value("test", o)
assert cache.has_key("test")
assert "test" in cache
assert not cache.has_key("foo")
assert "foo" not in cache
cache.remove_value("test")
assert not cache.has_key("test")
def test_has_key_multicache():
cache = Cache('test', data_dir='./cache', url=db_url, type='ext:database')
o = object()
cache.set_value("test", o)
assert cache.has_key("test")
assert "test" in cache
cache = Cache('test', data_dir='./cache', url=db_url, type='ext:database')
assert cache.has_key("test")
cache.remove_value('test')
def test_clear():
cache = Cache('test', data_dir='./cache', url=db_url, type='ext:database')
o = object()
cache.set_value("test", o)
assert cache.has_key("test")
cache.clear()
assert not cache.has_key("test")
def test_unicode_keys():
cache = Cache('test', data_dir='./cache', url=db_url, type='ext:database')
o = object()
cache.set_value(u'hiŏ', o)
assert u'hiŏ' in cache
assert u'hŏa' not in cache
cache.remove_value(u'hiŏ')
assert u'hiŏ' not in cache
@util.skip_if(lambda: TestApp is None, "webtest not installed")
def test_increment():
app = TestApp(CacheMiddleware(simple_app))
res = app.get('/', extra_environ={'beaker.clear':True})
assert 'current value is: 1' in res
res = app.get('/')
assert 'current value is: 2' in res
res = app.get('/')
assert 'current value is: 3' in res
@util.skip_if(lambda: TestApp is None, "webtest not installed")
def test_cache_manager():
app = TestApp(CacheMiddleware(cache_manager_app))
res = app.get('/')
assert 'test_key is: test value' in res
assert 'test_key cleared' in res
|