1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
"""Tests how asyncpg behaves in non-ideal conditions."""
import asyncio
import os
import platform
import unittest
from asyncpg import _testbase as tb
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
@unittest.skipIf(
platform.system() == 'Windows',
'not compatible with ProactorEventLoop which is default in Python 3.8+')
class TestConnectionLoss(tb.ProxiedClusterTestCase):
@tb.with_timeout(30.0)
async def test_connection_close_timeout(self):
con = await self.connect()
self.proxy.trigger_connectivity_loss()
with self.assertRaises(asyncio.TimeoutError):
await con.close(timeout=0.5)
@tb.with_timeout(30.0)
async def test_pool_acquire_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
try:
self.proxy.trigger_connectivity_loss()
for _ in range(2):
with self.assertRaises(asyncio.TimeoutError):
async with pool.acquire(timeout=0.5):
pass
self.proxy.restore_connectivity()
async with pool.acquire(timeout=0.5):
pass
finally:
self.proxy.restore_connectivity()
pool.terminate()
@tb.with_timeout(30.0)
async def test_pool_release_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
try:
with self.assertRaises(asyncio.TimeoutError):
async with pool.acquire(timeout=0.5):
self.proxy.trigger_connectivity_loss()
finally:
self.proxy.restore_connectivity()
pool.terminate()
@tb.with_timeout(30.0)
async def test_pool_handles_abrupt_connection_loss(self):
pool_size = 3
query_runtime = 0.5
pool_timeout = cmd_timeout = 1.0
concurrency = 9
pool_concurrency = (concurrency - 1) // pool_size + 1
# Worst expected runtime + 20% to account for other latencies.
worst_runtime = (pool_timeout + cmd_timeout) * pool_concurrency * 1.2
async def worker(pool):
async with pool.acquire(timeout=pool_timeout) as con:
await con.fetch('SELECT pg_sleep($1)', query_runtime)
def kill_connectivity():
self.proxy.trigger_connectivity_loss()
new_pool = self.create_pool(
database='postgres', min_size=pool_size, max_size=pool_size,
timeout=cmd_timeout, command_timeout=cmd_timeout)
with self.assertRunUnder(worst_runtime):
pool = await new_pool
try:
workers = [worker(pool) for _ in range(concurrency)]
self.loop.call_later(1, kill_connectivity)
await asyncio.gather(
*workers, return_exceptions=True)
finally:
pool.terminate()
|