1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import pytest
from sqlalchemy import bindparam
from sqlalchemy import MetaData, Table, Column, Integer, String
from aiomysql import sa
meta = MetaData()
tbl = Table('sa_tbl_cache_test', meta,
Column('id', Integer, nullable=False,
primary_key=True),
Column('val', String(255)))
@pytest.fixture()
def make_engine(connection, mysql_params, loop):
engines = []
async def _make_engine(**kwargs):
if "unix_socket" in mysql_params:
conn_args = {"unix_socket": mysql_params["unix_socket"]}
else:
conn_args = {
"host": mysql_params['host'],
"port": mysql_params['port'],
}
if "ssl" in mysql_params:
conn_args["ssl"] = mysql_params["ssl"]
engine = await sa.create_engine(
db=mysql_params['db'],
user=mysql_params['user'],
password=mysql_params['password'],
minsize=10,
**conn_args,
**kwargs,
)
engines.append(engine)
return engine
yield _make_engine
for engine in engines:
engine.terminate()
loop.run_until_complete(engine.wait_closed())
async def start(engine):
async with engine.acquire() as conn:
tx = await conn.begin()
await conn.execute("DROP TABLE IF EXISTS "
"sa_tbl_cache_test")
await conn.execute("CREATE TABLE sa_tbl_cache_test"
"(id serial, val varchar(255))")
await conn.execute(tbl.insert().values(val='some_val_1'))
await conn.execute(tbl.insert().values(val='some_val_2'))
await conn.execute(tbl.insert().values(val='some_val_3'))
await tx.commit()
@pytest.mark.run_loop
async def test_dialect(make_engine):
cache = dict()
engine = await make_engine(compiled_cache=cache)
await start(engine)
async with engine.acquire() as conn:
# check select with params not added to cache
q = tbl.select().where(tbl.c.val == 'some_val_1')
cursor = await conn.execute(q)
row = await cursor.fetchone()
assert 'some_val_1' == row.val
assert 0 == len(cache)
# check select with bound params added to cache
select_by_val = tbl.select().where(
tbl.c.val == bindparam('value')
)
cursor = await conn.execute(
select_by_val, {'value': 'some_val_3'}
)
row = await cursor.fetchone()
assert 'some_val_3' == row.val
assert 1 == len(cache)
cursor = await conn.execute(
select_by_val, value='some_val_2'
)
row = await cursor.fetchone()
assert 'some_val_2' == row.val
assert 1 == len(cache)
select_all = tbl.select()
cursor = await conn.execute(select_all)
rows = await cursor.fetchall()
assert 3 == len(rows)
assert 2 == len(cache)
# check insert with bound params not added to cache
await conn.execute(tbl.insert().values(val='some_val_4'))
assert 2 == len(cache)
# check insert with bound params added to cache
q = tbl.insert().values(val=bindparam('value'))
await conn.execute(q, value='some_val_5')
assert 3 == len(cache)
await conn.execute(q, value='some_val_6')
assert 3 == len(cache)
await conn.execute(q, {'value': 'some_val_7'})
assert 3 == len(cache)
cursor = await conn.execute(select_all)
rows = await cursor.fetchall()
assert 7 == len(rows)
assert 3 == len(cache)
# check update with params not added to cache
q = tbl.update().where(
tbl.c.val == 'some_val_1'
).values(val='updated_val_1')
await conn.execute(q)
assert 3 == len(cache)
cursor = await conn.execute(
select_by_val, value='updated_val_1'
)
row = await cursor.fetchone()
assert 'updated_val_1' == row.val
# check update with bound params added to cache
q = tbl.update().where(
tbl.c.val == bindparam('value')
).values(val=bindparam('update'))
await conn.execute(
q, value='some_val_2', update='updated_val_2'
)
assert 4 == len(cache)
cursor = await conn.execute(
select_by_val, value='updated_val_2'
)
row = await cursor.fetchone()
assert 'updated_val_2' == row.val
|