1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import asyncio
import time
import psycopg
import pytest
from .utils import PG_SUPPORTS_SCRAM, WINDOWS
@pytest.mark.skipif("WINDOWS", reason="gets stuck for some reason during takeover")
async def test_online_restart(bouncer):
for _ in range(5):
# max_client_conn = 10
# default_pool_size = 5
task = bouncer.asleep(2, dbname="p1", times=5)
await asyncio.sleep(0.5)
await bouncer.reboot()
await task
async def test_pause_resume(bouncer):
task = bouncer.asleep(0.1, times=50, sequentially=True, connect_timeout=30)
for _ in range(5):
await bouncer.aadmin("pause")
await asyncio.sleep(1)
await bouncer.aadmin("resume")
await asyncio.sleep(1)
await task
async def test_suspend_resume(bouncer):
task = bouncer.asleep(0.1, times=50, sequentially=True)
for _ in range(5):
async with bouncer.admin_runner.acur() as cur:
await cur.execute("suspend")
await asyncio.sleep(1)
await cur.execute("resume")
await asyncio.sleep(1)
await task
def test_enable_disable(bouncer):
bouncer.test()
bouncer.admin("disable p0")
with pytest.raises(
psycopg.OperationalError,
match=r'database "p0" is disabled',
):
bouncer.test()
bouncer.admin("enable p0")
bouncer.test()
async def test_database_restart(pg, bouncer):
bouncer.admin("set server_login_retry=1")
bouncer.test()
pg.restart()
bouncer.test()
tasks = []
for i in range(1, 6):
tasks.append(bouncer.asleep(i, dbname="p0"))
tasks.append(bouncer.asleep(i, dbname="p1"))
await asyncio.sleep(0.5)
if WINDOWS:
# WindowsSelectorEventLoopPolicy does not support async subprocesses,
# so we fall back to regular suprocesses here.
pg.restart()
else:
await pg.arestart()
for task in tasks:
try:
await task
except psycopg.OperationalError:
pass
bouncer.test(dbname="p0")
bouncer.test(dbname="p1")
def test_database_change(bouncer):
bouncer.admin("set server_lifetime=2")
bouncer.default_db = "p1"
assert bouncer.sql_value("select current_database()") == "p1"
with bouncer.ini_path.open() as f:
original = f.read()
with bouncer.ini_path.open("w") as f:
f.write(original.replace("dbname=p1", "dbname=p0"))
bouncer.admin("reload")
time.sleep(3)
assert bouncer.sql_value("select current_database()") == "p0"
def test_reconnect(bouncer):
pid1 = bouncer.sql_value("select pg_backend_pid()")
bouncer.admin("reconnect")
time.sleep(1)
pid2 = bouncer.sql_value("select pg_backend_pid()")
assert pid1 != pid2
@pytest.mark.skipif("not PG_SUPPORTS_SCRAM")
@pytest.mark.skipif("WINDOWS", reason="gets stuck for some reason during takeover")
async def test_scram_takeover(bouncer):
bouncer.admin("set pool_mode=transaction")
bouncer.admin("set server_lifetime=3")
bouncer.admin("set auth_type='scram-sha-256'")
async with bouncer.acur(dbname="p62", user="scramuser1", password="foo") as cur:
await cur.execute("select 1")
await asyncio.sleep(4) # wait for server_lifetime
await bouncer.reboot()
await cur.execute("select 1")
|