1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
import pytest
from asyncmy.connection import Connection
@pytest.mark.asyncio
async def test_pool(pool):
assert pool.minsize == 1
assert pool.maxsize == 10
assert pool.size == 1
assert pool.freesize == 1
@pytest.mark.asyncio
async def test_pool_cursor(pool):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
ret = await cursor.fetchone()
assert ret == (1,)
@pytest.mark.asyncio
async def test_acquire(pool):
conn = await pool.acquire()
assert isinstance(conn, Connection)
assert pool.freesize == 0
assert pool.size == 1
assert conn.connected
await pool.release(conn)
assert pool.freesize == 1
assert pool.size == 1
|