1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
#!/usr/bin/env python
# The purpose of this module is to provide a battery of tests for a database
# The idea being that you would then implement the necessary database-specific bits in separate modules
from jToolkit.data import database
from jToolkit import errors
import os
import glob
import threading
import time
class BaseTestDatabase:
def create_database(cls, config):
# config is enough of a configuration to create the required database class
cls.db = database.dbwrapper(config, errors.ConsoleErrorHandler())
create_database = classmethod(create_database)
def setup_method(self, method):
self.methodid = "%s_%s" % (self.__class__.__name__, method.__name__)
self.tablename = method.__name__
self.sqlfile = "%s.sql" % (self.methodid)
if os.path.exists(self.sqlfile):
os.remove(self.sqlfile)
for resultfile in self.getresultfiles():
os.remove(resultfile)
self.teardownsql = None
def teardown_method(self, method):
if self.teardownsql:
self.db.execute(self.teardownsql)
if os.path.exists(self.sqlfile):
os.remove(self.sqlfile)
for resultfile in self.getresultfiles():
os.remove(resultfile)
def getresultfiles(self):
"""lists available result files"""
return glob.glob("%s*.txt" % self.methodid)
def test_table_create(self, tablename=None):
"""Test creating a table"""
if not tablename:
tablename = self.tablename
createsql = "create table %s(x %s, y %s)" % (tablename, self.db.dbtypename("string"), self.db.dbtypename("string"))
self.teardownsql = "drop table %s" % tablename
self.db.execute(createsql)
def test_insert(self, tablename=None):
"""Test inserting into a table - doesn't test the validity of what's inserted (see select test for that)"""
if not tablename:
tablename = self.tablename
self.test_table_create(tablename)
insertsql = "insert into %s(x, y) values('test', 'me')" % tablename
self.db.execute(insertsql)
self.db.execute(insertsql)
def test_select(self, tablename=None):
"""Test selecting from a table"""
if not tablename:
tablename = self.tablename
self.test_insert(tablename)
selectsql = "select * from %s" % tablename
rows = self.db.allrowdicts(selectsql)
assert len(rows) == 2
for row in rows:
assert len(row) == 2
assert row["x"] == "test"
assert row["y"] == "me"
def test_threaded(self, threadClass=None):
"""Test multi-threaded insert/select"""
NUM_THREADS = 30
if not threadClass:
threadClass = threading.Thread
def test_threaded_func(obj, num):
print "In thread %d" % num
obj.test_select("test_threaded_%d" % num)
self.db.execute("drop table test_threaded_%d" % num)
print "Done thread %d" % num
threads = [threadClass(target=test_threaded_func, args=[self, num]) for num in range(NUM_THREADS)]
for thr in threads:
thr.start()
threadsEnded = False
for i in range(6000): # Give ourselves 10 minutes
if not (True in [thr.isAlive() for thr in threads]):
threadsEnded = True
break
time.sleep(1)
assert(threadsEnded)
self.teardownsql = None
def test_pylucene_threaded(self):
"""Test threaded with PyLucene threads"""
try:
import PyLucene
except ImportError:
print "PyLucene has to be installed for this test"
assert(False)
self.test_threaded(PyLucene.PythonThread)
def test_intensive_select_count(self):
"""Test select count(*) intensively"""
NUM_ROWS = 20
self.test_table_create()
insertsql = "insert into %s(x, y) values ('test', 'me')" % self.tablename
countsql = "select count(*) from %s" % self.tablename
for i in range(NUM_ROWS):
self.db.execute(insertsql)
count = self.db.singlevalue(countsql)
assert(count is not None)
assert(int(count) == (i+1))
def test_list_fields(self):
"""Test listing the fields in a database table"""
self.test_select()
columnnames = self.db.listfieldnames(self.tablename)
assert(str(columnnames[0]) == 'x')
assert(str(columnnames[1]) == 'y')
def test_list_fields_and_select_count(self):
"""Test a condition which seems to be failing on Nick's computer with MyOleDB"""
self.test_list_fields()
countsql = "select count(*) from %s" % self.tablename
count = self.db.singlevalue(countsql)
assert(count is not None)
assert(int(count) == 2)
columnnames = self.db.listfieldnames(self.tablename)
assert(str(columnnames[0]) == 'x')
assert(str(columnnames[1]) == 'y')
count = self.db.singlevalue(countsql)
assert(count is not None)
assert(int(count) == 2)
|