File: test_pool.py

package info (click to toggle)
python-bonsai 1.5.0%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,108 kB
  • sloc: python: 6,660; ansic: 5,534; makefile: 169; sh: 90
file content (181 lines) | stat: -rw-r--r-- 5,001 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import pytest

from bonsai import LDAPClient
from bonsai.pool import (
    ClosedPool,
    ConnectionPool,
    EmptyPool,
    PoolError,
    ThreadedConnectionPool,
)

import math
import threading
import time


def test_init():
    """ Test pool initialisation. """
    cli = LDAPClient("ldap://dummy.nfo")
    with pytest.raises(ValueError):
        _ = ConnectionPool(cli, minconn=-3)
    with pytest.raises(ValueError):
        _ = ConnectionPool(cli, minconn=5, maxconn=3)
    pool = ConnectionPool(cli, minconn=2, maxconn=5)
    assert pool.closed == True
    assert pool.empty == False
    assert pool.max_connection == 5
    assert pool.shared_connection == 0
    assert pool.idle_connection == 0


def test_open(client):
    """ Test opening the pool. """
    pool = ConnectionPool(client, minconn=5)
    pool.open()
    assert pool.closed != True
    assert pool.idle_connection == 5


def test_close(client):
    """ Test closing the connection. """
    pool = ConnectionPool(client, minconn=1)
    pool.open()
    conn = pool.get()
    pool.close()
    assert pool.closed == True
    assert pool.idle_connection == 0
    assert pool.shared_connection == 0
    assert conn.closed == True


def test_get(client):
    """ Test getting a connection from the pool. """
    pool = ConnectionPool(client, minconn=1, maxconn=2)
    with pytest.raises(ClosedPool):
        _ = pool.get()
    pool.open()
    assert pool.max_connection == 2
    assert pool.idle_connection == 1
    assert pool.shared_connection == 0
    conn1 = pool.get()
    assert conn1 is not None
    assert conn1.closed == False
    assert pool.idle_connection == 0
    assert pool.shared_connection == 1
    conn2 = pool.get()
    assert pool.idle_connection == 0
    assert pool.shared_connection == 2
    with pytest.raises(EmptyPool):
        _ = pool.get()
    assert pool.empty == True


def test_put(client):
    """ Test putting connection back into the pool. """
    pool = ConnectionPool(client, minconn=1, maxconn=1)
    with pytest.raises(ClosedPool):
        pool.put(None)
    pool.open()
    conn = pool.get()
    assert pool.idle_connection == 0
    pool.put(conn)
    assert pool.idle_connection == 1
    other_conn = client.connect()
    with pytest.raises(PoolError):
        pool.put(other_conn)


def test_put_closed(client):
    """ Test closed connections are not put back into the pool. """
    pool = ConnectionPool(client, minconn=1, maxconn=1)
    pool.open()
    conn = pool.get()
    assert pool.idle_connection == 0
    assert pool.shared_connection == 1
    conn.close()
    pool.put(conn)
    assert pool.idle_connection == 0
    assert pool.shared_connection == 0

    # The pool is below its minimum connection count, but this shouldn't cause
    # problems and get should create a new connection.
    conn = pool.get()
    assert pool.idle_connection == 0
    assert pool.shared_connection == 1
    pool.put(conn)
    pool.close()


def test_max_connection():
    """ Test max_connection property. """
    cli = LDAPClient("ldap://dummy.nfo")
    pool = ConnectionPool(cli, minconn=5, maxconn=5)
    assert pool.max_connection == 5
    with pytest.raises(ValueError):
        pool.max_connection = 4
    pool.max_connection = 10
    assert pool.max_connection == 10


def test_spawn(client):
    """ Test context manager. """
    pool = ConnectionPool(client, minconn=1, maxconn=1)
    assert pool.idle_connection == 0
    with pool.spawn() as conn:
        _ = conn.whoami()
    assert pool.idle_connection == 1
    assert pool.shared_connection == 0


def keep(pool, sleep):
    try:
        with pool.spawn() as conn:
            time.sleep(sleep)
            assert not conn.closed
    except ClosedPool:
        pass


def test_threaded_pool_block(client):
    """ Test threaded pool blocks when it's empty. """
    sleep = 5
    pool = ThreadedConnectionPool(client, minconn=1, maxconn=1)
    pool.open()
    t0 = threading.Thread(target=keep, args=(pool, sleep))
    conn = pool.get()
    t0.start()
    pool.put(conn)
    time.sleep(2)
    start = time.time()
    conn = pool.get()
    # Some assertation that it didn't happen immediately.
    assert math.ceil(time.time() - start) + 1.2 >= sleep / 2.0
    assert not conn.closed
    pool.put(conn)


def test_threaded_pool_raise(client):
    """ Test threaded pool blocks when it's empty. """
    sleep = 5
    pool = ThreadedConnectionPool(client, minconn=1, maxconn=1, block=False)
    t0 = threading.Thread(target=keep, args=(pool, sleep))
    t0.start()
    time.sleep(2)
    with pytest.raises(EmptyPool):
        _ = pool.get()
    time.sleep(sleep + 1.2)
    conn = pool.get()
    assert conn.closed == False


def test_threaded_pool_close(client):
    """ Test closing threaded pool. """
    sleep = 5
    pool = ThreadedConnectionPool(client, minconn=1, maxconn=1, block=False)
    t0 = threading.Thread(target=keep, args=(pool, sleep))
    t0.start()
    time.sleep(2)
    pool.close()
    assert pool.closed
    t0.join()