File: test_sqliteconnpool.py

package info (click to toggle)
postfix-mta-sts-resolver 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 536 kB
  • sloc: python: 3,069; sh: 226; makefile: 47
file content (118 lines) | stat: -rw-r--r-- 3,638 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import tempfile

import pytest

from postfix_mta_sts_resolver.sqlite_cache import SqliteConnPool

CONN_INIT = [
    "PRAGMA journal_mode=WAL",
    "PRAGMA synchronous=NORMAL",
]

@pytest.fixture
def dbfile():
    with tempfile.NamedTemporaryFile() as f:
        yield f.name

@pytest.mark.asyncio
async def test_raises_re(dbfile):
    pool = SqliteConnPool(1, (dbfile,), init_queries=CONN_INIT)
    with pytest.raises(RuntimeError):
        async with pool.borrow() as conn:
            pass

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_basic(dbfile):
    pool = SqliteConnPool(1, (dbfile,), init_queries=CONN_INIT)
    await pool.prepare()
    try:
        async with pool.borrow() as conn:
            await conn.execute("create table if not exists t (id int)")
        async with pool.borrow() as conn:
            for i in range(10):
                await conn.execute("insert into t (id) values (?)", (i,))
            await conn.commit()
        async with pool.borrow() as conn:
            async with conn.execute('select sum(id) from t') as cur:
                res = await cur.fetchone()
        assert res[0] == sum(range(10))
    finally:
        await pool.stop()

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_early_stop(dbfile):
    pool = SqliteConnPool(5, (dbfile,), init_queries=CONN_INIT)
    await pool.prepare()
    async with pool.borrow() as conn:
        await pool.stop()
        await conn.execute("create table if not exists t (id int)")
        for i in range(10):
            await conn.execute("insert into t (id) values (?)", (i,))
        await conn.commit()
        async with conn.execute('select sum(id) from t') as cur:
            res = await cur.fetchone()
    assert res[0] == sum(range(10))

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_borrow_timeout(dbfile):
    pool = SqliteConnPool(1, (dbfile,), init_queries=CONN_INIT)
    await pool.prepare()
    try:
        async with pool.borrow(1) as conn1:
            with pytest.raises(asyncio.TimeoutError):
                async with pool.borrow(1) as conn2:
                    pass
    finally:
        await pool.stop()

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_conn_reuse(dbfile):
    pool = SqliteConnPool(1, (dbfile,), init_queries=CONN_INIT)
    await pool.prepare()
    try:
        async with pool.borrow() as conn:
            first = conn
        async with pool.borrow() as conn:
            second = conn
        assert first is second
    finally:
        await pool.stop()

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_conn_ressurection(dbfile):
    class TestError(Exception):
        pass
    pool = SqliteConnPool(1, (dbfile,), init_queries=CONN_INIT)
    await pool.prepare()
    try:
        with pytest.raises(TestError):
            async with pool.borrow() as conn:
                first = conn
                async with conn.execute("SELECT 1") as cur:
                    result = await cur.fetchone()
                    assert result[0] == 1
                raise TestError()
        async with pool.borrow() as conn:
            async with conn.execute("SELECT 1") as cur:
                result = await cur.fetchone()
                assert result[0] == 1
            second = conn
        assert first is not second
    finally:
        await pool.stop()

@pytest.mark.asyncio
@pytest.mark.timeout(2)
async def test_bad_init(dbfile):
    pool = SqliteConnPool(1, (dbfile,), init_queries=['BOGUSQUERY'])
    try:
        with pytest.raises(Exception):
            await pool.prepare()
    finally:
        await pool.stop()