1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
import os
import time
import tempfile
import threading
import unittest
import shutil
import web
class SessionTest(unittest.TestCase):
def setUp(self):
app = web.auto_application()
session = self.make_session(app)
class count(app.page):
def GET(self):
session.count += 1
return str(session.count)
class reset(app.page):
def GET(self):
session.kill()
return ""
class redirect(app.page):
def GET(self):
session.request_token = "123"
raise web.redirect("/count")
class get_session(app.page):
path = "/session/(.*)"
def GET(self, name):
return session[name]
self.app = app
self.session = session
def make_session(self, app):
dir = tempfile.mkdtemp()
store = web.session.DiskStore(dir)
return web.session.Session(app, store, {"count": 0})
def testSession(self):
b = self.app.browser()
self.assertEqual(b.open("/count").read(), b"1")
self.assertEqual(b.open("/count").read(), b"2")
self.assertEqual(b.open("/count").read(), b"3")
b.open("/reset")
self.assertEqual(b.open("/count").read(), b"1")
def testParallelSessions(self):
b1 = self.app.browser()
b2 = self.app.browser()
b1.open("/count")
for i in range(1, 10):
self.assertEqual(b1.open("/count").read(), str(i + 1).encode("utf8"))
self.assertEqual(b2.open("/count").read(), str(i).encode("utf8"))
def testBadSessionId(self):
b = self.app.browser()
self.assertEqual(b.open("/count").read(), b"1")
self.assertEqual(b.open("/count").read(), b"2")
cookie = b.cookiejar._cookies["0.0.0.0"]["/"]["webpy_session_id"]
cookie.value = "/etc/password"
self.assertEqual(b.open("/count").read(), b"1")
def testSlowCookies(self):
b = self.app.browser()
self.assertEqual(b.open("/count").read(), b"1")
self.assertEqual(b.open("/count").read(), b"2")
cookie = b.cookiejar._cookies["0.0.0.0"]["/"]["webpy_session_id"]
cookie.value = '"/etc/password"'
self.assertEqual(b.open("/count").read(), b"1")
def testRedirect(self):
b = self.app.browser()
b.open("/redirect")
b.open("/session/request_token")
self.assertEqual(b.data, b"123")
class DiskStoreTest(unittest.TestCase):
def testRemovedSessionDir(self):
# make sure `cleanup()` correctly returns when session directory was
# removed.
dir = tempfile.mkdtemp()
s = web.session.DiskStore(dir)
s["count"] = 20
self.assertEqual(s["count"], 20)
shutil.rmtree(dir)
time.sleep(2)
s.cleanup(1)
def testStoreConcurrent(self):
dir = tempfile.mkdtemp()
store = web.session.DiskStore(dir)
def set_val():
store["fail"] = "value"
for c in range(10):
m = threading.Thread(target=set_val)
m.start()
try:
value = store["fail"]
except KeyError:
pass
self.assertEqual(value, "value")
class DBSessionTest(SessionTest):
"""Session test with db store."""
def make_session(self, app):
if os.path.exists("webpy.db"):
os.remove("webpy.db")
db = web.database(dbn="sqlite", db="webpy.db")
# db.printing = True
db.query(
""
+ "CREATE TABLE session ("
+ " session_id char(128) unique not null,"
+ " atime timestamp default (datetime('now','utc')),"
+ " data text)"
)
store = web.session.DBStore(db, "session")
return web.session.Session(app, store, {"count": 0})
class MemorySessionTest(SessionTest):
"""Session test with db store."""
def make_session(self, app):
store = web.session.MemoryStore()
return web.session.Session(app, store, {"count": 0})
|