File: test_table.py

package info (click to toggle)
grass 7.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 135,976 kB
  • ctags: 44,148
  • sloc: ansic: 410,300; python: 166,939; cpp: 34,819; sh: 9,358; makefile: 6,618; xml: 3,551; sql: 769; lex: 519; yacc: 450; asm: 387; perl: 282; sed: 17; objc: 7
file content (216 lines) | stat: -rw-r--r-- 7,010 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 25 11:08:22 2014

@author: pietro
"""
try:
    from builtins import long
except ImportError:
    # python3
    long = int

import os
import sqlite3
import tempfile as tmp
from string import ascii_letters, digits
from random import choice
import numpy as np

from grass.gunittest.case import TestCase
from grass.gunittest.main import test

from grass.pygrass.vector.table import Table, get_path


# dictionary that generate random data
COL2VALS = {'INT': lambda n:     np.random.randint(9, size=n),
            'INTEGER': lambda n: np.random.randint(9, size=n),
            'INTEGER PRIMARY KEY': lambda n: np.arange(1, n+1, dtype=long),
            'REAL': lambda n: np.random.rand(n),
            'TEXT': lambda n: np.array([randstr() for _ in range(n)])}


def randstr(prefix='', suffix='', size=6, chars=ascii_letters + digits):
    """Return a random string of characters.

    :param prefix: string prefix, default: ''
    :type prefix: str

    :param suffix: string suffix, default: ''
    :type suffix: str

    :param size: number of random characters
    :type size: int

    :param chars: string containing the characters that will be used
    :type chars: str

    :returns: string
    """
    return prefix + ''.join(choice(chars) for _ in range(size)) + suffix


def get_table_random_values(nrows, columns):
    """Generate a random recarray respecting the columns definition.

    :param nrows: number of rows of the generated array
    :type nrows: int

    :param columns: list of tuple containing column name and type.
    :type columns: list of tuple

    :returns: numpy recarray
    """
    vals, dtype = [], []
    for cname, ctype in columns:
        if ctype not in COL2VALS:
            raise TypeError("Unknown column type %s for: %s" % (ctype, cname))
        vals.append(COL2VALS[ctype](nrows))
        dtype.append((cname, vals[-1].dtype.str))
    return np.array([v for v in zip(*vals)], dtype=dtype)


class DBconnection(object):
    """Define a class to share common methods between TestCase."""
    path = os.path.join(tmp.gettempdir(), randstr(prefix='temp', suffix='.db'))
    connection = sqlite3.connect(get_path(path))
    columns = [('cat', 'INTEGER PRIMARY KEY'),
               ('cint', 'INT'),
               ('creal', 'REAL'),
               ('ctxt', 'TEXT')]

    def create_table_instance(self, **kw):
        """Return a Table class instance

        :param **kw: keyword arguments of Table class
                     without name and connection.
        :type **kw: key-word arguments

        :returns: Table instance
        """
        self.tname = randstr(prefix='temp')
        return Table(name=self.tname,
                     connection=self.connection, **kw)

    def create_empty_table(self, columns=None, **kw):
        """Create an empty table in the database and return Table class
        instance.

        :param columns: list of tuple containing the column names and types.
        :type columns: list of tuple

        :param **kw: keyword arguments of Table class
                     without name and connection.
        :type **kw: key-word arguments

        :returns: Table instance
        """
        columns = self.columns if columns is None else columns
        table = self.create_table_instance(**kw)
        table.create(columns)
        return table

    def create_not_empty_table(self, nrows=None, values=None,
                               columns=None, **kw):
        """Create a not empty table in the database and return Table class
        instance.

        :param nrows: number of rows.
        :type nrows: list of tuple

        :param values: list of tuple containing the values for each row.
        :type values: list of tuple

        :param columns: list of tuple containing the column names and types.
        :type columns: list of tuple

        :param **kw: keyword arguments of Table class
                     without name and connection.
        :type **kw: key-word arguments

        :returns: Table instance
        """
        if nrows is None and values is None:
            msg = "Both parameters ``nrows`` ``values`` are empty"
            raise RuntimeError(msg)
        columns = self.columns if columns is None else columns
        values = (get_table_random_values(nrows, columns) if values is None
                  else values)
        table = self.create_empty_table(columns=columns, **kw)
        table.insert(values, many=True)
        return table

    def setUp(self):
        """Create a not empty table instance"""
        self.table = self.create_not_empty_table(10)
        self.cols = self.table.columns

    def tearDown(self):
        """Remove the generated vector map, if exist"""
        self.table.drop(force=True)
        self.table = None
        self.cols = None


class ColumnsTestCase(DBconnection, TestCase):

    def test_check_insert_update_str(self):
        """Check insert_str and update_str attribute of Columns are correct"""
        insert = 'INSERT INTO %s VALUES (?,?,?,?)'
        self.assertEqual(self.cols.insert_str, insert % self.tname)
        update = 'UPDATE %s SET cint=?,creal=?,ctxt=? WHERE cat=?;'
        self.assertEqual(self.cols.update_str, update % self.tname)


class TableInsertTestCase(DBconnection, TestCase):

    def setUp(self):
        """Create a not empty table instance"""
        self.table = self.create_empty_table()
        self.cols = self.table.columns

    def tearDown(self):
        """Remove the generated vector map, if exist"""
        self.table.drop(force=True)
        self.table = None
        self.cols = None

    def test_insert(self):
        """Test Table.insert method"""
        cat = 1
        vals = (cat, 1111, 0.1111, 'test')
        cur = self.connection.cursor()
        self.table.insert(vals, cursor=cur)
        sqlquery = "SELECT cat, cint, creal, ctxt FROM %s WHERE cat=%d"
        cur.execute(sqlquery % (self.tname, cat))
        self.assertTupleEqual(vals, cur.fetchone())

    def test_insert_many(self):
        """Test Table.insert method using many==True"""
        vals = [(1, 1111, 0.1111, 'test1'),
                (2, 2222, 0.2222, 'test2'),
                (3, 3333, 0.3333, 'test3')]
        cur = self.connection.cursor()
        self.table.insert(vals, cursor=cur, many=True)
        sqlquery = "SELECT cat, cint, creal, ctxt FROM %s"
        cur.execute(sqlquery % self.tname)
        self.assertListEqual(vals, cur.fetchall())


class TableUpdateTestCase(DBconnection, TestCase):

    def test_update(self):
        """Test Table.update method"""
        vals = (1122, 0.1122, 'test')
        cat = 1
        cur = self.connection.cursor()
        self.table.update(cat, list(vals), cursor=cur)
        self.connection.commit()
        sqlquery = "SELECT cint, creal, ctxt FROM %s WHERE cat=%d"
        cur.execute(sqlquery % (self.tname, cat))
        self.assertTupleEqual(vals, cur.fetchone())


if __name__ == '__main__':
    test()