1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import capabilities
from datetime import timedelta
from contextlib import closing
import unittest
import MySQLdb
from MySQLdb.compat import unicode
from MySQLdb import cursors
from configdb import connection_factory
import warnings
warnings.filterwarnings('ignore')
class test_MySQLdb(capabilities.DatabaseTest):
db_module = MySQLdb
connect_args = ()
connect_kwargs = dict(use_unicode=True, sql_mode="ANSI,STRICT_TRANS_TABLES,TRADITIONAL")
create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
leak_test = False
def quote_identifier(self, ident):
return "`%s`" % ident
def test_TIME(self):
def generator(row,col):
return timedelta(0, row*8000)
self.check_data_integrity(
('col1 TIME',),
generator)
def test_TINYINT(self):
# Number data
def generator(row, col):
v = (row*row) % 256
if v > 127:
v = v-256
return v
self.check_data_integrity(
('col1 TINYINT',),
generator)
def test_stored_procedures(self):
db = self.connection
c = self.cursor
self.create_table(('pos INT', 'tree CHAR(20)'))
c.executemany("INSERT INTO %s (pos,tree) VALUES (%%s,%%s)" % self.table,
list(enumerate('ash birch cedar Lärche pine'.split())))
db.commit()
c.execute("""
CREATE PROCEDURE test_sp(IN t VARCHAR(255))
BEGIN
SELECT pos FROM %s WHERE tree = t;
END
""" % self.table)
db.commit()
c.callproc('test_sp', ('Lärche',))
rows = c.fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0][0], 3)
c.nextset()
c.execute("DROP PROCEDURE test_sp")
c.execute('drop table %s' % (self.table))
def test_small_CHAR(self):
# Character data
def generator(row,col):
i = (row*col+62)%256
if i == 62: return ''
if i == 63: return None
return chr(i)
self.check_data_integrity(
('col1 char(1)','col2 char(1)'),
generator)
def test_BIT(self):
c = self.cursor
try:
c.execute("""create table test_BIT (
b3 BIT(3),
b7 BIT(10),
b64 BIT(64))""")
one64 = '1'*64
c.execute(
"insert into test_BIT (b3, b7, b64)"
" VALUES (b'011', b'1111111111', b'%s')"
% one64)
c.execute("SELECT b3, b7, b64 FROM test_BIT")
row = c.fetchone()
self.assertEqual(row[0], b'\x03')
self.assertEqual(row[1], b'\x03\xff')
self.assertEqual(row[2], b'\xff'*8)
finally:
c.execute("drop table if exists test_BIT")
def test_MULTIPOLYGON(self):
c = self.cursor
try:
c.execute("""create table test_MULTIPOLYGON (
id INTEGER PRIMARY KEY,
border MULTIPOLYGON)""")
c.execute(
"insert into test_MULTIPOLYGON (id, border)"
" VALUES (1, GeomFromText('MULTIPOLYGON(((1 1, 1 -1, -1 -1, -1 1, 1 1)),((1 1, 3 1, 3 3, 1 3, 1 1)))'))"
)
c.execute("SELECT id, AsText(border) FROM test_MULTIPOLYGON")
row = c.fetchone()
self.assertEqual(row[0], 1)
self.assertEqual(row[1], 'MULTIPOLYGON(((1 1,1 -1,-1 -1,-1 1,1 1)),((1 1,3 1,3 3,1 3,1 1)))')
c.execute("SELECT id, AsWKB(border) FROM test_MULTIPOLYGON")
row = c.fetchone()
self.assertEqual(row[0], 1)
self.assertNotEqual(len(row[1]), 0)
c.execute("SELECT id, border FROM test_MULTIPOLYGON")
row = c.fetchone()
self.assertEqual(row[0], 1)
self.assertNotEqual(len(row[1]), 0)
finally:
c.execute("drop table if exists test_MULTIPOLYGON")
def test_bug_2671682(self):
from MySQLdb.constants import ER
try:
self.cursor.execute("describe some_non_existent_table");
except self.connection.ProgrammingError as msg:
self.assertTrue(str(ER.NO_SUCH_TABLE) in str(msg))
def test_bug_3514287(self):
c = self.cursor
try:
c.execute("""create table bug_3541287 (
c1 CHAR(10),
t1 TIMESTAMP)""")
c.execute("insert into bug_3541287 (c1,t1) values (%s, NOW())",
("blah",))
finally:
c.execute("drop table if exists bug_3541287")
def test_ping(self):
self.connection.ping()
def test_reraise_exception(self):
c = self.cursor
try:
c.execute("SELECT x FROM not_existing_table")
except MySQLdb.ProgrammingError as e:
self.assertEqual(e.args[0], 1146)
return
self.fail("Should raise ProgrammingError")
def test_binary_prefix(self):
# verify prefix behaviour when enabled, disabled and for default (disabled)
for binary_prefix in (True, False, None):
kwargs = self.connect_kwargs.copy()
# needs to be set to can guarantee CHARSET response for normal strings
kwargs['charset'] = 'utf8'
if binary_prefix != None:
kwargs['binary_prefix'] = binary_prefix
with closing(connection_factory(**kwargs)) as conn:
with closing(conn.cursor()) as c:
c.execute('SELECT CHARSET(%s)', (MySQLdb.Binary(b'raw bytes'),))
self.assertEqual(c.fetchall()[0][0], 'binary' if binary_prefix else 'utf8')
# normal strings should not get prefix
c.execute('SELECT CHARSET(%s)', ('str',))
self.assertEqual(c.fetchall()[0][0], 'utf8')
if __name__ == '__main__':
if test_MySQLdb.leak_test:
import gc
gc.enable()
gc.set_debug(gc.DEBUG_LEAK)
unittest.main()
|