1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
from sqlalchemy import event
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling
pool = None
class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults):
__requires__ = ("cpython", "python_profiling_backend")
class Connection:
def rollback(self):
pass
def close(self):
pass
def setup_test(self):
# create a throwaway pool which
# has the effect of initializing
# class-level event listeners on Pool,
# if not present already.
p1 = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
p1.connect()
global pool
pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
# make this a real world case where we have a "connect" handler
@event.listens_for(pool, "connect")
def do_connect(dbapi_conn, conn_record):
pass
@profiling.function_call_count()
def test_first_connect(self):
pool.connect()
def test_second_connect(self):
conn = pool.connect()
conn.close()
@profiling.function_call_count()
def go():
conn2 = pool.connect()
return conn2
go()
|