1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
import datetime
import json
import re
import time
import pytest
from pymysql.err import ProgrammingError
@pytest.fixture
def datatype_table(loop, cursor, table_cleanup):
async def f():
await cursor.execute(
"CREATE TABLE test_datatypes (b bit, i int, l bigint, f real, s "
"varchar(32), u varchar(32), bb blob, d date, dt datetime, "
"ts timestamp, td time, t time, st datetime)")
table_cleanup('test_datatypes')
loop.run_until_complete(f())
table_cleanup('test_datatypes')
@pytest.mark.run_loop
async def test_datatypes(connection, cursor, datatype_table):
encoding = connection.charset
if encoding == 'utf8mb4':
encoding = 'utf8'
# insert values
v = (
True, -3, 123456789012, 5.7, "hello'\" world",
u"Espa\xc3\xb1ol",
"binary\x00data".encode(encoding),
datetime.date(1988, 2, 2),
datetime.datetime.now().replace(microsecond=0),
datetime.timedelta(5, 6), datetime.time(16, 32),
time.localtime())
await cursor.execute(
"INSERT INTO test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) "
"values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
v)
await cursor.execute(
"select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
r = await cursor.fetchone()
assert bytes([1]) == r[0]
# assert v[1:8] == r[1:8])
assert v[1:9] == r[1:9]
# mysql throws away microseconds so we need to check datetimes
# specially. additionally times are turned into timedeltas.
# self.assertEqual(datetime.datetime(*v[8].timetuple()[:6]), r[8])
# TODO: figure out why this assert fails
# assert [9] == r[9] # just timedeltas
expected = datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute))
assert expected == r[10]
assert datetime.datetime(*v[-1][:6]) == r[-1]
@pytest.mark.run_loop
async def test_datatypes_nulls(cursor, datatype_table):
# check nulls
await cursor.execute(
"insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) "
"values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
[None] * 12)
await cursor.execute(
"select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
r = await cursor.fetchone()
assert tuple([None] * 12) == r
@pytest.mark.run_loop
async def test_datatypes_sequence_types(cursor, datatype_table):
# check sequence type
await cursor.execute(
"INSERT INTO test_datatypes (i, l) VALUES (2,4), (6,8), "
"(10,12)")
await cursor.execute(
"select l from test_datatypes where i in %s order by i",
((2, 6),))
r = await cursor.fetchall()
assert ((4,), (8,)) == r
@pytest.mark.run_loop
async def test_dict_escaping(cursor, table_cleanup):
sql = "CREATE TABLE test_dict (a INTEGER, b INTEGER, c INTEGER)"
await cursor.execute(sql)
table_cleanup('test_dict')
sql = "INSERT INTO test_dict (a,b,c) VALUES (%(a)s, %(b)s, %(c)s)"
await cursor.execute(sql, {"a": 1, "b": 2, "c": 3})
await cursor.execute("SELECT a,b,c FROM test_dict")
r = await cursor.fetchone()
assert (1, 2, 3) == r
@pytest.mark.run_loop
async def test_string(cursor, table_cleanup):
await cursor.execute("DROP TABLE IF EXISTS test_string;")
await cursor.execute("CREATE TABLE test_string (a text)")
test_value = "I am a test string"
table_cleanup('test_string')
await cursor.execute("INSERT INTO test_string (a) VALUES (%s)",
test_value)
await cursor.execute("SELECT a FROM test_string")
r = await cursor.fetchone()
assert (test_value,) == r
@pytest.mark.run_loop
async def test_string_with_emoji(cursor, table_cleanup):
await cursor.execute("DROP TABLE IF EXISTS test_string_with_emoji;")
await cursor.execute("CREATE TABLE test_string_with_emoji (a text) "
"DEFAULT CHARACTER SET=\"utf8mb4\"")
test_value = "I am a test string with emoji ๐"
table_cleanup('test_string_with_emoji')
await cursor.execute("INSERT INTO test_string_with_emoji (a) VALUES (%s)",
test_value)
await cursor.execute("SELECT a FROM test_string_with_emoji")
r = await cursor.fetchone()
assert (test_value,) == r
@pytest.mark.run_loop
async def test_integer(cursor, table_cleanup):
await cursor.execute("CREATE TABLE test_integer (a INTEGER)")
table_cleanup('test_integer')
test_value = 12345
await cursor.execute("INSERT INTO test_integer (a) VALUES (%s)",
test_value)
await cursor.execute("SELECT a FROM test_integer")
r = await cursor.fetchone()
assert (test_value,) == r
@pytest.mark.run_loop
async def test_binary_data(cursor, table_cleanup):
data = bytes(bytearray(range(256)) * 4)
await cursor.execute("CREATE TABLE test_blob (b blob)")
table_cleanup('test_blob')
await cursor.execute("INSERT INTO test_blob (b) VALUES (%s)",
(data,))
await cursor.execute("SELECT b FROM test_blob")
(r,) = await cursor.fetchone()
assert data == r
@pytest.mark.run_loop
async def test_untyped_convertion_to_null_and_empty_string(cursor):
await cursor.execute("select null,''")
r = await cursor.fetchone()
assert (None, u'') == r
await cursor.execute("select '',null")
r = await cursor.fetchone()
assert (u'', None) == r
@pytest.mark.run_loop
async def test_timedelta_conversion(cursor):
await cursor.execute(
"select time('12:30'), time('23:12:59'), time('23:12:59.05100'), "
"time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), "
"time('-00:30')")
r = await cursor.fetchone()
assert (datetime.timedelta(0, 45000),
datetime.timedelta(0, 83579),
datetime.timedelta(0, 83579, 51000),
-datetime.timedelta(0, 45000),
-datetime.timedelta(0, 83579),
-datetime.timedelta(0, 83579, 51000),
-datetime.timedelta(0, 1800)) == r
@pytest.mark.run_loop
async def test_datetime_conversion(cursor, table_cleanup):
dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450)
try:
await cursor.execute("CREATE TABLE test_datetime"
"(id INT, ts DATETIME(6))")
table_cleanup('test_datetime')
await cursor.execute("INSERT INTO test_datetime VALUES "
"(1,'2013-11-12 09:09:09.12345')")
await cursor.execute("SELECT ts FROM test_datetime")
r = await cursor.fetchone()
assert (dt,) == r
except ProgrammingError:
# User is running a version of MySQL that doesn't support
# msecs within datetime
pass
@pytest.mark.run_loop
async def test_get_transaction_status(connection, cursor):
# make sure that connection is clean without transactions
transaction_flag = connection.get_transaction_status()
assert not transaction_flag
# start transaction
await connection.begin()
# make sure transaction flag is up
transaction_flag = connection.get_transaction_status()
assert transaction_flag
await cursor.execute('SELECT 1;')
(r, ) = await cursor.fetchone()
assert r == 1
await connection.commit()
# make sure that transaction flag is down
transaction_flag = connection.get_transaction_status()
assert not transaction_flag
@pytest.mark.run_loop
async def test_rollback(connection, cursor):
await cursor.execute('DROP TABLE IF EXISTS tz_data;')
await cursor.execute('CREATE TABLE tz_data ('
'region VARCHAR(64),'
'zone VARCHAR(64),'
'name VARCHAR(64))')
await connection.commit()
args = ('America', '', 'America/New_York')
await cursor.execute('INSERT INTO tz_data VALUES (%s, %s, %s)',
args)
await cursor.execute('SELECT * FROM tz_data;')
data = await cursor.fetchall()
assert len(data) == 1
await connection.rollback()
await cursor.execute('SELECT * FROM tz_data;')
data = await cursor.fetchall()
# should not return any rows since no inserts was commited
assert len(data) == 0
def mysql_server_is(server_version, version_tuple):
"""Return True if the given connection is on the version given or
greater.
e.g.::
if self.mysql_server_is(conn, (5, 6, 4)):
# do something for MySQL 5.6.4 and above
"""
server_version_tuple = tuple(
(int(dig) if dig is not None else 0)
for dig in
re.match(r'(\d+)\.(\d+)\.(\d+)', server_version).group(1, 2, 3)
)
return server_version_tuple >= version_tuple
@pytest.mark.run_loop
async def test_json(connection_creator, table_cleanup):
connection = await connection_creator(
charset="utf8mb4", autocommit=True)
# TODO do better
server_info = connection.get_server_info()
if not mysql_server_is(server_info, (5, 7, 0)):
raise pytest.skip("JSON type is not supported on MySQL <= 5.6")
cursor = await connection.cursor()
await cursor.execute("""\
CREATE TABLE test_json (
id INT NOT NULL,
json JSON NOT NULL,
PRIMARY KEY (id)
);""")
table_cleanup("test_json")
json_str = '{"hello": "ใใใซใกใฏ"}'
await cursor.execute(
"INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
await cursor.execute("SELECT `json` from `test_json` WHERE `id`=42")
r = await cursor.fetchone()
assert json.loads(r[0]) == json.loads(json_str)
await cursor.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
r = await cursor.fetchone()
assert json.loads(r[0]) == json.loads(json_str)
|