1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
import pytest
from bonsai import LDAPClient
from bonsai.pool import (
ClosedPool,
ConnectionPool,
EmptyPool,
PoolError,
ThreadedConnectionPool,
)
import math
import threading
import time
def test_init():
""" Test pool initialisation. """
cli = LDAPClient("ldap://dummy.nfo")
with pytest.raises(ValueError):
_ = ConnectionPool(cli, minconn=-3)
with pytest.raises(ValueError):
_ = ConnectionPool(cli, minconn=5, maxconn=3)
pool = ConnectionPool(cli, minconn=2, maxconn=5)
assert pool.closed == True
assert pool.empty == False
assert pool.max_connection == 5
assert pool.shared_connection == 0
assert pool.idle_connection == 0
def test_open(client):
""" Test opening the pool. """
pool = ConnectionPool(client, minconn=5)
pool.open()
assert pool.closed != True
assert pool.idle_connection == 5
def test_close(client):
""" Test closing the connection. """
pool = ConnectionPool(client, minconn=1)
pool.open()
conn = pool.get()
pool.close()
assert pool.closed == True
assert pool.idle_connection == 0
assert pool.shared_connection == 0
assert conn.closed == True
def test_get(client):
""" Test getting a connection from the pool. """
pool = ConnectionPool(client, minconn=1, maxconn=2)
with pytest.raises(ClosedPool):
_ = pool.get()
pool.open()
assert pool.max_connection == 2
assert pool.idle_connection == 1
assert pool.shared_connection == 0
conn1 = pool.get()
assert conn1 is not None
assert conn1.closed == False
assert pool.idle_connection == 0
assert pool.shared_connection == 1
conn2 = pool.get()
assert pool.idle_connection == 0
assert pool.shared_connection == 2
with pytest.raises(EmptyPool):
_ = pool.get()
assert pool.empty == True
def test_put(client):
""" Test putting connection back into the pool. """
pool = ConnectionPool(client, minconn=1, maxconn=1)
with pytest.raises(ClosedPool):
pool.put(None)
pool.open()
conn = pool.get()
assert pool.idle_connection == 0
pool.put(conn)
assert pool.idle_connection == 1
other_conn = client.connect()
with pytest.raises(PoolError):
pool.put(other_conn)
def test_put_closed(client):
""" Test closed connections are not put back into the pool. """
pool = ConnectionPool(client, minconn=1, maxconn=1)
pool.open()
conn = pool.get()
assert pool.idle_connection == 0
assert pool.shared_connection == 1
conn.close()
pool.put(conn)
assert pool.idle_connection == 0
assert pool.shared_connection == 0
# The pool is below its minimum connection count, but this shouldn't cause
# problems and get should create a new connection.
conn = pool.get()
assert pool.idle_connection == 0
assert pool.shared_connection == 1
pool.put(conn)
pool.close()
def test_max_connection():
""" Test max_connection property. """
cli = LDAPClient("ldap://dummy.nfo")
pool = ConnectionPool(cli, minconn=5, maxconn=5)
assert pool.max_connection == 5
with pytest.raises(ValueError):
pool.max_connection = 4
pool.max_connection = 10
assert pool.max_connection == 10
def test_spawn(client):
""" Test context manager. """
pool = ConnectionPool(client, minconn=1, maxconn=1)
assert pool.idle_connection == 0
with pool.spawn() as conn:
_ = conn.whoami()
assert pool.idle_connection == 1
assert pool.shared_connection == 0
def keep(pool, sleep):
try:
with pool.spawn() as conn:
time.sleep(sleep)
assert not conn.closed
except ClosedPool:
pass
def test_threaded_pool_block(client):
""" Test threaded pool blocks when it's empty. """
sleep = 5
pool = ThreadedConnectionPool(client, minconn=1, maxconn=1)
pool.open()
t0 = threading.Thread(target=keep, args=(pool, sleep))
conn = pool.get()
t0.start()
pool.put(conn)
time.sleep(2)
start = time.time()
conn = pool.get()
# Some assertation that it didn't happen immediately.
assert math.ceil(time.time() - start) + 1.2 >= sleep / 2.0
assert not conn.closed
pool.put(conn)
def test_threaded_pool_raise(client):
""" Test threaded pool blocks when it's empty. """
sleep = 5
pool = ThreadedConnectionPool(client, minconn=1, maxconn=1, block=False)
t0 = threading.Thread(target=keep, args=(pool, sleep))
t0.start()
time.sleep(2)
with pytest.raises(EmptyPool):
_ = pool.get()
time.sleep(sleep + 1.2)
conn = pool.get()
assert conn.closed == False
def test_threaded_pool_close(client):
""" Test closing threaded pool. """
sleep = 5
pool = ThreadedConnectionPool(client, minconn=1, maxconn=1, block=False)
t0 = threading.Thread(target=keep, args=(pool, sleep))
t0.start()
time.sleep(2)
pool.close()
assert pool.closed
t0.join()
|