File: test_async_iter.py

package info (click to toggle)
aiomysql 0.1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 912 kB
  • sloc: python: 6,894; makefile: 213
file content (65 lines) | stat: -rw-r--r-- 1,836 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import pytest

from aiomysql import SSCursor
from aiomysql import sa

from sqlalchemy import MetaData, Table, Column, Integer, String


meta = MetaData()
tbl = Table('tbl', meta,
            Column('id', Integer, nullable=False, primary_key=True),
            Column('name', String(255)))


@pytest.fixture
def table(loop, connection_creator, table_cleanup):
    async def f():
        connection = await connection_creator()
        cursor = await connection.cursor()
        await cursor.execute("DROP TABLE IF EXISTS tbl;")
        await cursor.execute("""CREATE TABLE tbl (
                 id MEDIUMINT NOT NULL AUTO_INCREMENT,
                 name VARCHAR(255) NOT NULL,
                 PRIMARY KEY (id));""")

        for i in [(1, 'a'), (2, 'b'), (3, 'c')]:
            await cursor.execute("INSERT INTO tbl VALUES(%s, %s)", i)

        await cursor.execute("commit;")
        await cursor.close()

    table_cleanup('tbl')
    loop.run_until_complete(f())


@pytest.mark.run_loop
async def test_async_cursor(cursor, table):
    ret = []
    await cursor.execute('SELECT * from tbl;')
    async for i in cursor:
        ret.append(i)
    assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret


@pytest.mark.run_loop
async def test_async_cursor_server_side(connection, table):
    ret = []
    cursor = await connection.cursor(SSCursor)
    await cursor.execute('SELECT * from tbl;')
    async for i in cursor:
        ret.append(i)
    assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret


@pytest.mark.run_loop
async def test_async_iter_over_sa_result(mysql_params, table, loop):
    ret = []
    engine = await sa.create_engine(**mysql_params, loop=loop)
    conn = await engine.acquire()

    async for i in (await conn.execute(tbl.select())):
        ret.append(i)

    assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
    engine.terminate()