from sqlalchemy import event
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling

pool = None


class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults):
    __requires__ = ("cpython", "python_profiling_backend")

    class Connection:
        def rollback(self):
            pass

        def close(self):
            pass

    def setup_test(self):
        # create a throwaway pool which
        # has the effect of initializing
        # class-level event listeners on Pool,
        # if not present already.
        p1 = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
        p1.connect()

        global pool
        pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)

        # make this a real world case where we have a "connect" handler
        @event.listens_for(pool, "connect")
        def do_connect(dbapi_conn, conn_record):
            pass

    @profiling.function_call_count()
    def test_first_connect(self):
        pool.connect()

    def test_second_connect(self):
        conn = pool.connect()
        conn.close()

        @profiling.function_call_count()
        def go():
            conn2 = pool.connect()
            return conn2

        go()
