File: base_test_database.py

package info (click to toggle)
python-jtoolkit 0.7.8-2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 1,436 kB
  • ctags: 2,536
  • sloc: python: 15,143; makefile: 20
file content (136 lines) | stat: -rw-r--r-- 5,196 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python

# The purpose of this module is to provide a battery of tests for a database
# The idea being that you would then implement the necessary database-specific bits in separate modules

from jToolkit.data import database
from jToolkit import errors
import os
import glob
import threading
import time

class BaseTestDatabase:
    def create_database(cls, config):
        # config is enough of a configuration to create the required database class
        cls.db = database.dbwrapper(config, errors.ConsoleErrorHandler())
    create_database = classmethod(create_database)
        
    def setup_method(self, method):
        self.methodid = "%s_%s" % (self.__class__.__name__, method.__name__)
        self.tablename = method.__name__
        self.sqlfile = "%s.sql" % (self.methodid)
        if os.path.exists(self.sqlfile):
            os.remove(self.sqlfile)
        for resultfile in self.getresultfiles():
            os.remove(resultfile)
        self.teardownsql = None

    def teardown_method(self, method):
        if self.teardownsql:
            self.db.execute(self.teardownsql)
        if os.path.exists(self.sqlfile):
            os.remove(self.sqlfile)
        for resultfile in self.getresultfiles():
            os.remove(resultfile)
            
    def getresultfiles(self):
        """lists available result files"""
        return glob.glob("%s*.txt" % self.methodid)

    def test_table_create(self, tablename=None):
        """Test creating a table"""
        if not tablename:
            tablename = self.tablename
        createsql = "create table %s(x %s, y %s)" % (tablename, self.db.dbtypename("string"), self.db.dbtypename("string"))
        self.teardownsql = "drop table %s" % tablename
        self.db.execute(createsql)

    def test_insert(self, tablename=None):
        """Test inserting into a table - doesn't test the validity of what's inserted (see select test for that)"""
        if not tablename:
            tablename = self.tablename
        self.test_table_create(tablename)
        insertsql = "insert into %s(x, y) values('test', 'me')" % tablename
        self.db.execute(insertsql)
        self.db.execute(insertsql)

    def test_select(self, tablename=None):
        """Test selecting from a table"""
        if not tablename:
            tablename = self.tablename
        self.test_insert(tablename)
        selectsql = "select * from %s" % tablename
        rows = self.db.allrowdicts(selectsql)
        assert len(rows) == 2
        for row in rows:
            assert len(row) == 2
            assert row["x"] == "test"
            assert row["y"] == "me"

    def test_threaded(self, threadClass=None):
        """Test multi-threaded insert/select"""
        NUM_THREADS = 30
        if not threadClass:
            threadClass = threading.Thread
        def test_threaded_func(obj, num):
            print "In thread %d" % num
            obj.test_select("test_threaded_%d" % num)
            self.db.execute("drop table test_threaded_%d" % num)
            print "Done thread %d" % num
        threads = [threadClass(target=test_threaded_func, args=[self, num]) for num in range(NUM_THREADS)]
        for thr in threads:
            thr.start()
        
        threadsEnded = False
        for i in range(6000):       # Give ourselves 10 minutes
            if not (True in [thr.isAlive() for thr in threads]):
                threadsEnded = True
                break
            time.sleep(1)
        assert(threadsEnded)
        self.teardownsql = None

    def test_pylucene_threaded(self):
        """Test threaded with PyLucene threads"""
        try:
            import PyLucene
        except ImportError:
            print "PyLucene has to be installed for this test"
            assert(False)
        self.test_threaded(PyLucene.PythonThread)

    def test_intensive_select_count(self):
        """Test select count(*) intensively"""
        NUM_ROWS = 20
        self.test_table_create()
        insertsql = "insert into %s(x, y) values ('test', 'me')" % self.tablename
        countsql = "select count(*) from %s" % self.tablename
        for i in range(NUM_ROWS):
            self.db.execute(insertsql)
            count = self.db.singlevalue(countsql)
            assert(count is not None)
            assert(int(count) == (i+1))

    def test_list_fields(self):
        """Test listing the fields in a database table"""
        self.test_select()
        columnnames = self.db.listfieldnames(self.tablename)
        assert(str(columnnames[0]) == 'x')
        assert(str(columnnames[1]) == 'y')

    def test_list_fields_and_select_count(self):
        """Test a condition which seems to be failing on Nick's computer with MyOleDB"""
        self.test_list_fields()
        countsql = "select count(*) from %s" % self.tablename
        count = self.db.singlevalue(countsql)
        assert(count is not None)
        assert(int(count) == 2)
        columnnames = self.db.listfieldnames(self.tablename)
        assert(str(columnnames[0]) == 'x')
        assert(str(columnnames[1]) == 'y')
        count = self.db.singlevalue(countsql)
        assert(count is not None)
        assert(int(count) == 2)