File: test_operations.py

package info (click to toggle)
pgbouncer 1.25.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,884 kB
  • sloc: ansic: 61,425; python: 5,703; sh: 4,527; makefile: 1,361; sed: 22
file content (121 lines) | stat: -rw-r--r-- 3,323 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncio
import time

import psycopg
import pytest

from .utils import PG_SUPPORTS_SCRAM, WINDOWS


@pytest.mark.skipif("WINDOWS", reason="gets stuck for some reason during takeover")
async def test_online_restart(bouncer):
    for _ in range(5):
        # max_client_conn = 10
        # default_pool_size = 5
        task = bouncer.asleep(2, dbname="p1", times=5)
        await asyncio.sleep(0.5)
        await bouncer.reboot()
        await task


async def test_pause_resume(bouncer):
    task = bouncer.asleep(0.1, times=50, sequentially=True, connect_timeout=30)

    for _ in range(5):
        await bouncer.aadmin("pause")
        await asyncio.sleep(1)
        await bouncer.aadmin("resume")
        await asyncio.sleep(1)

    await task


async def test_suspend_resume(bouncer):
    task = bouncer.asleep(0.1, times=50, sequentially=True)

    for _ in range(5):
        async with bouncer.admin_runner.acur() as cur:
            await cur.execute("suspend")
            await asyncio.sleep(1)
            await cur.execute("resume")
            await asyncio.sleep(1)

    await task


def test_enable_disable(bouncer):
    bouncer.test()
    bouncer.admin("disable p0")

    with pytest.raises(
        psycopg.OperationalError,
        match=r'database "p0" is disabled',
    ):
        bouncer.test()
    bouncer.admin("enable p0")
    bouncer.test()


async def test_database_restart(pg, bouncer):
    bouncer.admin("set server_login_retry=1")
    bouncer.test()
    pg.restart()
    bouncer.test()

    tasks = []
    for i in range(1, 6):
        tasks.append(bouncer.asleep(i, dbname="p0"))
        tasks.append(bouncer.asleep(i, dbname="p1"))

    await asyncio.sleep(0.5)
    if WINDOWS:
        # WindowsSelectorEventLoopPolicy does not support async subprocesses,
        # so we fall back to regular suprocesses here.
        pg.restart()
    else:
        await pg.arestart()
    for task in tasks:
        try:
            await task
        except psycopg.OperationalError:
            pass
    bouncer.test(dbname="p0")
    bouncer.test(dbname="p1")


def test_database_change(bouncer):
    bouncer.admin("set server_lifetime=2")
    bouncer.default_db = "p1"
    assert bouncer.sql_value("select current_database()") == "p1"

    with bouncer.ini_path.open() as f:
        original = f.read()
    with bouncer.ini_path.open("w") as f:
        f.write(original.replace("dbname=p1", "dbname=p0"))
    bouncer.admin("reload")
    time.sleep(3)

    assert bouncer.sql_value("select current_database()") == "p0"


def test_reconnect(bouncer):
    pid1 = bouncer.sql_value("select pg_backend_pid()")

    bouncer.admin("reconnect")
    time.sleep(1)

    pid2 = bouncer.sql_value("select pg_backend_pid()")
    assert pid1 != pid2


@pytest.mark.skipif("not PG_SUPPORTS_SCRAM")
@pytest.mark.skipif("WINDOWS", reason="gets stuck for some reason during takeover")
async def test_scram_takeover(bouncer):
    bouncer.admin("set pool_mode=transaction")
    bouncer.admin("set server_lifetime=3")
    bouncer.admin("set auth_type='scram-sha-256'")
    async with bouncer.acur(dbname="p62", user="scramuser1", password="foo") as cur:
        await cur.execute("select 1")
        await asyncio.sleep(4)  # wait for server_lifetime
        await bouncer.reboot()
        await cur.execute("select 1")