File: test_sa_engine.py

package info (click to toggle)
aiomysql 0.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 896 kB
  • sloc: python: 6,841; makefile: 79
file content (192 lines) | stat: -rw-r--r-- 4,615 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import asyncio

import pytest
from sqlalchemy import MetaData, Table, Column, Integer, String

from aiomysql import sa

meta = MetaData()
tbl = Table('sa_tbl3', meta,
            Column('id', Integer, nullable=False,
                   primary_key=True),
            Column('name', String(255)))


@pytest.fixture()
def make_engine(connection, mysql_params, loop):
    engines = []

    async def _make_engine(**kwargs):
        if "unix_socket" in mysql_params:
            conn_args = {"unix_socket": mysql_params["unix_socket"]}
        else:
            conn_args = {
                "host": mysql_params['host'],
                "port": mysql_params['port'],
            }
            if "ssl" in mysql_params:
                conn_args["ssl"] = mysql_params["ssl"]

        engine = await sa.create_engine(
            db=mysql_params['db'],
            user=mysql_params['user'],
            password=mysql_params['password'],
            minsize=10,
            **conn_args,
            **kwargs,
        )

        engines.append(engine)

        return engine

    yield _make_engine

    for engine in engines:
        engine.terminate()
        loop.run_until_complete(engine.wait_closed())


async def start(engine):
    async with engine.acquire() as conn:
        await conn.execute("DROP TABLE IF EXISTS sa_tbl3")
        await conn.execute("CREATE TABLE sa_tbl3 "
                           "(id serial, name varchar(255))")


@pytest.mark.run_loop
async def test_dialect(make_engine):
    engine = await make_engine()
    await start(engine)

    assert sa.engine._dialect == engine.dialect


@pytest.mark.run_loop
async def test_name(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 'mysql' == engine.name


@pytest.mark.run_loop
async def test_driver(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 'pymysql' == engine.driver

# @pytest.mark.run_loop
# async def test_dsn(self):
#     self.assertEqual(
#         'dbname=aiomysql user=aiomysql password=xxxxxx host=127.0.0.1',
#         engine.dsn)


@pytest.mark.run_loop
async def test_minsize(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 10 == engine.minsize


@pytest.mark.run_loop
async def test_maxsize(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 10 == engine.maxsize


@pytest.mark.run_loop
async def test_size(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 10 == engine.size


@pytest.mark.run_loop
async def test_freesize(make_engine):
    engine = await make_engine()
    await start(engine)
    assert 10 == engine.freesize


@pytest.mark.run_loop
async def test_make_engine_with_default_loop(make_engine):
    engine = await make_engine()
    await start(engine)

    engine.close()
    await engine.wait_closed()


@pytest.mark.run_loop
async def test_not_context_manager(make_engine):
    engine = await make_engine()
    await start(engine)
    with pytest.raises(RuntimeError):
        with engine:
            pass


@pytest.mark.run_loop
async def test_release_transacted(make_engine):
    engine = await make_engine()
    await start(engine)
    conn = await engine.acquire()
    tr = await conn.begin()
    with pytest.raises(sa.InvalidRequestError):
        engine.release(conn)
    del tr


@pytest.mark.run_loop
async def test_cannot_acquire_after_closing(make_engine):
    engine = await make_engine()
    await start(engine)
    engine.close()

    with pytest.raises(RuntimeError):
        await engine.acquire()
    await engine.wait_closed()


@pytest.mark.run_loop
async def test_wait_closed(make_engine):
    engine = await make_engine()
    await start(engine)

    c1 = await engine.acquire()
    c2 = await engine.acquire()
    assert 10 == engine.size
    assert 8 == engine.freesize

    ops = []

    async def do_release(conn):
        await asyncio.sleep(0)
        engine.release(conn)
        ops.append('release')

    async def wait_closed():
        await engine.wait_closed()
        ops.append('wait_closed')

    engine.close()
    await asyncio.gather(wait_closed(), do_release(c1),
                         do_release(c2))
    assert ['release', 'release', 'wait_closed'] == ops
    assert 0 == engine.freesize
    engine.close()
    await engine.wait_closed()


@pytest.mark.run_loop
async def test_terminate_with_acquired_connections(make_engine):
    engine = await make_engine()
    await start(engine)

    conn = await engine.acquire()
    engine.terminate()
    await engine.wait_closed()

    assert conn.closed