1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 25 11:08:22 2014
@author: pietro
"""
try:
from builtins import long
except ImportError:
# python3
long = int
import os
import sqlite3
import tempfile as tmp
from string import ascii_letters, digits
from random import choice
import numpy as np
from grass.gunittest.case import TestCase
from grass.gunittest.main import test
from grass.pygrass.vector.table import Table, get_path
# dictionary that generate random data
COL2VALS = {'INT': lambda n: np.random.randint(9, size=n),
'INTEGER': lambda n: np.random.randint(9, size=n),
'INTEGER PRIMARY KEY': lambda n: np.arange(1, n+1, dtype=long),
'REAL': lambda n: np.random.rand(n),
'TEXT': lambda n: np.array([randstr() for _ in range(n)])}
def randstr(prefix='', suffix='', size=6, chars=ascii_letters + digits):
"""Return a random string of characters.
:param prefix: string prefix, default: ''
:type prefix: str
:param suffix: string suffix, default: ''
:type suffix: str
:param size: number of random characters
:type size: int
:param chars: string containing the characters that will be used
:type chars: str
:returns: string
"""
return prefix + ''.join(choice(chars) for _ in range(size)) + suffix
def get_table_random_values(nrows, columns):
"""Generate a random recarray respecting the columns definition.
:param nrows: number of rows of the generated array
:type nrows: int
:param columns: list of tuple containing column name and type.
:type columns: list of tuple
:returns: numpy recarray
"""
vals, dtype = [], []
for cname, ctype in columns:
if ctype not in COL2VALS:
raise TypeError("Unknown column type %s for: %s" % (ctype, cname))
vals.append(COL2VALS[ctype](nrows))
dtype.append((cname, vals[-1].dtype.str))
return np.array([v for v in zip(*vals)], dtype=dtype)
class DBconnection(object):
"""Define a class to share common methods between TestCase."""
path = os.path.join(tmp.gettempdir(), randstr(prefix='temp', suffix='.db'))
connection = sqlite3.connect(get_path(path))
columns = [('cat', 'INTEGER PRIMARY KEY'),
('cint', 'INT'),
('creal', 'REAL'),
('ctxt', 'TEXT')]
def create_table_instance(self, **kw):
"""Return a Table class instance
:param **kw: keyword arguments of Table class
without name and connection.
:type **kw: key-word arguments
:returns: Table instance
"""
self.tname = randstr(prefix='temp')
return Table(name=self.tname,
connection=self.connection, **kw)
def create_empty_table(self, columns=None, **kw):
"""Create an empty table in the database and return Table class
instance.
:param columns: list of tuple containing the column names and types.
:type columns: list of tuple
:param **kw: keyword arguments of Table class
without name and connection.
:type **kw: key-word arguments
:returns: Table instance
"""
columns = self.columns if columns is None else columns
table = self.create_table_instance(**kw)
table.create(columns)
return table
def create_not_empty_table(self, nrows=None, values=None,
columns=None, **kw):
"""Create a not empty table in the database and return Table class
instance.
:param nrows: number of rows.
:type nrows: list of tuple
:param values: list of tuple containing the values for each row.
:type values: list of tuple
:param columns: list of tuple containing the column names and types.
:type columns: list of tuple
:param **kw: keyword arguments of Table class
without name and connection.
:type **kw: key-word arguments
:returns: Table instance
"""
if nrows is None and values is None:
msg = "Both parameters ``nrows`` ``values`` are empty"
raise RuntimeError(msg)
columns = self.columns if columns is None else columns
values = (get_table_random_values(nrows, columns) if values is None
else values)
table = self.create_empty_table(columns=columns, **kw)
table.insert(values, many=True)
return table
def setUp(self):
"""Create a not empty table instance"""
self.table = self.create_not_empty_table(10)
self.cols = self.table.columns
def tearDown(self):
"""Remove the generated vector map, if exist"""
self.table.drop(force=True)
self.table = None
self.cols = None
class ColumnsTestCase(DBconnection, TestCase):
def test_check_insert_update_str(self):
"""Check insert_str and update_str attribute of Columns are correct"""
insert = 'INSERT INTO %s VALUES (?,?,?,?)'
self.assertEqual(self.cols.insert_str, insert % self.tname)
update = 'UPDATE %s SET cint=?,creal=?,ctxt=? WHERE cat=?;'
self.assertEqual(self.cols.update_str, update % self.tname)
class TableInsertTestCase(DBconnection, TestCase):
def setUp(self):
"""Create a not empty table instance"""
self.table = self.create_empty_table()
self.cols = self.table.columns
def tearDown(self):
"""Remove the generated vector map, if exist"""
self.table.drop(force=True)
self.table = None
self.cols = None
def test_insert(self):
"""Test Table.insert method"""
cat = 1
vals = (cat, 1111, 0.1111, 'test')
cur = self.connection.cursor()
self.table.insert(vals, cursor=cur)
sqlquery = "SELECT cat, cint, creal, ctxt FROM %s WHERE cat=%d"
cur.execute(sqlquery % (self.tname, cat))
self.assertTupleEqual(vals, cur.fetchone())
def test_insert_many(self):
"""Test Table.insert method using many==True"""
vals = [(1, 1111, 0.1111, 'test1'),
(2, 2222, 0.2222, 'test2'),
(3, 3333, 0.3333, 'test3')]
cur = self.connection.cursor()
self.table.insert(vals, cursor=cur, many=True)
sqlquery = "SELECT cat, cint, creal, ctxt FROM %s"
cur.execute(sqlquery % self.tname)
self.assertListEqual(vals, cur.fetchall())
class TableUpdateTestCase(DBconnection, TestCase):
def test_update(self):
"""Test Table.update method"""
vals = (1122, 0.1122, 'test')
cat = 1
cur = self.connection.cursor()
self.table.update(cat, list(vals), cursor=cur)
self.connection.commit()
sqlquery = "SELECT cint, creal, ctxt FROM %s WHERE cat=%d"
cur.execute(sqlquery % (self.tname, cat))
self.assertTupleEqual(vals, cur.fetchone())
if __name__ == '__main__':
test()
|