from sqlalchemy.testing import eq_
from sqlalchemy.orm import mapper, relationship, create_session, \
    clear_mappers, sessionmaker, aliased,\
    Session, subqueryload
from sqlalchemy.orm.mapper import _mapper_registry
from sqlalchemy.orm.session import _sessions
from sqlalchemy import testing
from sqlalchemy.testing import engines
from sqlalchemy import MetaData, Integer, String, ForeignKey, \
    Unicode, select
import sqlalchemy as sa
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.sql import column
from sqlalchemy.processors import to_decimal_processor_factory, \
    to_unicode_processor_factory
from sqlalchemy.testing.util import gc_collect
import decimal
import gc
from sqlalchemy.testing import fixtures
import weakref

class A(fixtures.ComparableEntity):
    pass
class B(fixtures.ComparableEntity):
    pass
class ASub(A):
    pass

def profile_memory(maxtimes=50):
    def decorate(func):
        # run the test N times.  if length of gc.get_objects()
        # keeps growing, assert false

        def get_objects_skipping_sqlite_issue():
            # pysqlite keeps adding weakref objects which only
            # get reset after 220 iterations.  We'd like to keep these
            # tests under 50 iterations and ideally about ten, so
            # just filter them out so that we get a "flatline" more quickly.

            if testing.against("sqlite+pysqlite"):
                return [o for o in gc.get_objects()
                        if not isinstance(o, weakref.ref)]
            else:
                return gc.get_objects()

        def profile(*args):
            gc_collect()
            samples = []

            success = False
            for y in range(maxtimes // 5):
                for x in range(5):
                    func(*args)
                    gc_collect()
                    samples.append(len(get_objects_skipping_sqlite_issue()))

                print("sample gc sizes:", samples)

                assert len(_sessions) == 0

                # check for "flatline" - size is constant for
                # 5 iterations
                for x in samples[-4:]:
                    if x != samples[-5]:
                        break
                else:
                    success = True

                if not success:
                    # object count is bigger than when it started
                    if samples[-1] > samples[0]:
                        for x in samples[1:-2]:
                            # see if a spike bigger than the endpoint exists
                            if x > samples[-1]:
                                success = True
                                break
                    else:
                        success = True

                # if we saw count go down or flatline,
                # we're done
                if success:
                    break

                # else keep trying until maxtimes

            else:
                assert False, repr(samples)

        return profile
    return decorate

def assert_no_mappers():
    clear_mappers()
    gc_collect()
    assert len(_mapper_registry) == 0

class EnsureZeroed(fixtures.ORMTest):
    def setup(self):
        _sessions.clear()
        _mapper_registry.clear()
        self.engine = engines.testing_engine(options={"use_reaper": False})

class MemUsageTest(EnsureZeroed):

    __requires__ = 'cpython',
    __backend__ = True

    # ensure a pure growing test trips the assertion
    @testing.fails_if(lambda: True)
    def test_fixture(self):
        class Foo(object):
            pass

        x = []
        @profile_memory(maxtimes=10)
        def go():
            x[-1:] = [Foo(), Foo(), Foo(), Foo(), Foo(), Foo()]
        go()

    def test_session(self):
        metadata = MetaData(self.engine)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True,
                                    test_needs_autoincrement=True),
            Column('col2', String(30)))

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True,
                                    test_needs_autoincrement=True),
            Column('col2', String(30)),
            Column('col3', Integer, ForeignKey("mytable.col1")))

        metadata.create_all()

        m1 = mapper(A, table1, properties={
            "bs":relationship(B, cascade="all, delete",
                                    order_by=table2.c.col1)},
            order_by=table1.c.col1)
        m2 = mapper(B, table2)

        m3 = mapper(A, table1, non_primary=True)

        @profile_memory()
        def go():
            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
        go()

        metadata.drop_all()
        del m1, m2, m3
        assert_no_mappers()

    def test_sessionmaker(self):
        @profile_memory()
        def go():
            sessmaker = sessionmaker(bind=self.engine)
            sess = sessmaker()
            r = sess.execute(select([1]))
            r.close()
            sess.close()
            del sess
            del sessmaker
        go()

    @testing.crashes('sqlite', ':memory: connection not suitable here')
    def test_orm_many_engines(self):
        metadata = MetaData(self.engine)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True,
                            test_needs_autoincrement=True),
            Column('col2', String(30)))

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True,
                            test_needs_autoincrement=True),
            Column('col2', String(30)),
            Column('col3', Integer, ForeignKey("mytable.col1")))

        metadata.create_all()

        m1 = mapper(A, table1, properties={
            "bs":relationship(B, cascade="all, delete",
                                    order_by=table2.c.col1)},
            order_by=table1.c.col1,
            _compiled_cache_size=10
            )
        m2 = mapper(B, table2,
            _compiled_cache_size=10
        )

        m3 = mapper(A, table1, non_primary=True)

        @profile_memory()
        def go():
            engine = engines.testing_engine(
                                options={'logging_name':'FOO',
                                        'pool_logging_name':'BAR',
                                        'use_reaper':False}
                                    )
            sess = create_session(bind=engine)

            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            engine.dispose()
        go()

        metadata.drop_all()
        del m1, m2, m3
        assert_no_mappers()

    def test_ad_hoc_types(self):
        """test storage of bind processors, result processors
        in dialect-wide registry."""

        from sqlalchemy.dialects import mysql, postgresql, sqlite
        from sqlalchemy import types

        eng = engines.testing_engine()
        for args in (
            (types.Integer, ),
            (types.String, ),
            (types.PickleType, ),
            (types.Enum, 'a', 'b', 'c'),
            (sqlite.DATETIME, ),
            (postgresql.ENUM, 'a', 'b', 'c'),
            (types.Interval, ),
            (postgresql.INTERVAL, ),
            (mysql.VARCHAR, ),
        ):
            @profile_memory()
            def go():
                type_ = args[0](*args[1:])
                bp = type_._cached_bind_processor(eng.dialect)
                rp = type_._cached_result_processor(eng.dialect, 0)
            go()

        assert not eng.dialect._type_memos


    def test_many_updates(self):
        metadata = MetaData(self.engine)

        wide_table = Table('t', metadata,
            Column('id', Integer, primary_key=True,
                                test_needs_autoincrement=True),
            *[Column('col%d' % i, Integer) for i in range(10)]
        )

        class Wide(object):
            pass

        mapper(Wide, wide_table, _compiled_cache_size=10)

        metadata.create_all()
        session = create_session()
        w1 = Wide()
        session.add(w1)
        session.flush()
        session.close()
        del session
        counter = [1]

        @profile_memory()
        def go():
            session = create_session()
            w1 = session.query(Wide).first()
            x = counter[0]
            dec = 10
            while dec > 0:
                # trying to count in binary here,
                # works enough to trip the test case
                if pow(2, dec) < x:
                    setattr(w1, 'col%d' % dec, counter[0])
                    x -= pow(2, dec)
                dec -= 1
            session.flush()
            session.close()
            counter[0] += 1

        try:
            go()
        finally:
            metadata.drop_all()

    @testing.crashes('mysql+cymysql', 'blocking')
    def test_unicode_warnings(self):
        metadata = MetaData(self.engine)
        table1 = Table('mytable', metadata, Column('col1', Integer,
                       primary_key=True,
                       test_needs_autoincrement=True), Column('col2',
                       Unicode(30)))
        metadata.create_all()
        i = [1]

        # the times here is cranked way up so that we can see
        # pysqlite clearing out its internal buffer and allow
        # the test to pass
        @testing.emits_warning()
        @profile_memory()
        def go():

            # execute with a non-unicode object. a warning is emitted,
            # this warning shouldn't clog up memory.

            self.engine.execute(table1.select().where(table1.c.col2
                               == 'foo%d' % i[0]))
            i[0] += 1
        try:
            go()
        finally:
            metadata.drop_all()

    def test_mapper_reset(self):
        metadata = MetaData(self.engine)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True,
                                test_needs_autoincrement=True),
            Column('col2', String(30)))

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True,
                                test_needs_autoincrement=True),
            Column('col2', String(30)),
            Column('col3', Integer, ForeignKey("mytable.col1")))

        @profile_memory()
        def go():
            m1 = mapper(A, table1, properties={
                "bs":relationship(B, order_by=table2.c.col1)
            })
            m2 = mapper(B, table2)

            m3 = mapper(A, table1, non_primary=True)

            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            clear_mappers()

        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    def test_alias_pathing(self):
        metadata = MetaData(self.engine)

        a = Table("a", metadata,
            Column('id', Integer, primary_key=True,
                                test_needs_autoincrement=True),
            Column('bid', Integer, ForeignKey('b.id')),
            Column('type', String(30))
        )

        asub = Table("asub", metadata,
            Column('id', Integer, ForeignKey('a.id'),
                                primary_key=True),
            Column('data', String(30)))

        b = Table("b", metadata,
            Column('id', Integer, primary_key=True,
                                test_needs_autoincrement=True),
        )
        mapper(A, a, polymorphic_identity='a',
            polymorphic_on=a.c.type)
        mapper(ASub, asub, inherits=A,polymorphic_identity='asub')
        m1 = mapper(B, b, properties={
            'as_':relationship(A)
        })

        metadata.create_all()
        sess = Session()
        a1 = ASub(data="a1")
        a2 = ASub(data="a2")
        a3 = ASub(data="a3")
        b1 = B(as_=[a1, a2, a3])
        sess.add(b1)
        sess.commit()
        del sess

        # sqlite has a slow enough growth here
        # that we have to run it more times to see the
        # "dip" again
        @profile_memory(maxtimes=120)
        def go():
            sess = Session()
            sess.query(B).options(subqueryload(B.as_.of_type(ASub))).all()
            sess.close()
        try:
            go()
        finally:
            metadata.drop_all()
        clear_mappers()

    def test_path_registry(self):
        metadata = MetaData()
        a = Table("a", metadata,
            Column('id', Integer, primary_key=True),
            Column('foo', Integer),
            Column('bar', Integer)
        )
        m1 = mapper(A, a)
        @profile_memory()
        def go():
            ma = sa.inspect(aliased(A))
            m1._path_registry[m1.attrs.foo][ma][m1.attrs.bar]
        go()
        clear_mappers()

    def test_with_inheritance(self):
        metadata = MetaData(self.engine)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True,
                                    test_needs_autoincrement=True),
            Column('col2', String(30))
            )

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, ForeignKey('mytable.col1'),
                   primary_key=True, test_needs_autoincrement=True),
            Column('col3', String(30)),
            )

        @profile_memory()
        def go():
            class A(fixtures.ComparableEntity):
                pass
            class B(A):
                pass

            mapper(A, table1,
                   polymorphic_on=table1.c.col2,
                   polymorphic_identity='a')
            mapper(B, table2,
                   inherits=A,
                   polymorphic_identity='b')

            sess = create_session()
            a1 = A()
            a2 = A()
            b1 = B(col3='b1')
            b2 = B(col3='b2')
            for x in [a1,a2,b1, b2]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(), A(), B(col3='b1'), B(col3='b2')
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()

            # don't need to clear_mappers()
            del B
            del A

        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    def test_with_manytomany(self):
        metadata = MetaData(self.engine)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True,
                                    test_needs_autoincrement=True),
            Column('col2', String(30))
            )

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True,
                                    test_needs_autoincrement=True),
            Column('col2', String(30)),
            )

        table3 = Table('t1tot2', metadata,
            Column('t1', Integer, ForeignKey('mytable.col1')),
            Column('t2', Integer, ForeignKey('mytable2.col1')),
            )

        @profile_memory()
        def go():
            class A(fixtures.ComparableEntity):
                pass
            class B(fixtures.ComparableEntity):
                pass

            mapper(A, table1, properties={
                'bs':relationship(B, secondary=table3,
                                    backref='as', order_by=table3.c.t1)
            })
            mapper(B, table2)

            sess = create_session()
            a1 = A(col2='a1')
            a2 = A(col2='a2')
            b1 = B(col2='b1')
            b2 = B(col2='b2')
            a1.bs.append(b1)
            a2.bs.append(b2)
            for x in [a1,a2]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()

            # don't need to clear_mappers()
            del B
            del A

        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    @testing.provide_metadata
    def test_key_fallback_result(self):
        e = self.engine
        m = self.metadata
        t = Table('t', m, Column('x', Integer), Column('y', Integer))
        m.create_all(e)
        e.execute(t.insert(), {"x":1, "y":1})
        @profile_memory()
        def go():
            r = e.execute(t.alias().select())
            for row in r:
                row[t.c.x]
        go()

    # fails on newer versions of pysqlite due to unusual memory behvior
    # in pysqlite itself. background at:
    # http://thread.gmane.org/gmane.comp.python.db.pysqlite.user/2290

    @testing.crashes('mysql+cymysql', 'blocking')
    def test_join_cache(self):
        metadata = MetaData(self.engine)
        table1 = Table('table1', metadata, Column('id', Integer,
                       primary_key=True,
                       test_needs_autoincrement=True), Column('data',
                       String(30)))
        table2 = Table('table2', metadata, Column('id', Integer,
                       primary_key=True,
                       test_needs_autoincrement=True), Column('data',
                       String(30)), Column('t1id', Integer,
                       ForeignKey('table1.id')))

        class Foo(object):
            pass

        class Bar(object):
            pass

        mapper(Foo, table1, properties={'bars'
               : relationship(mapper(Bar, table2))})
        metadata.create_all()
        session = sessionmaker()

        @profile_memory()
        def go():
            s = table2.select()
            sess = session()
            sess.query(Foo).join((s, Foo.bars)).all()
            sess.rollback()
        try:
            go()
        finally:
            metadata.drop_all()



    def test_type_compile(self):
        from sqlalchemy.dialects.sqlite.base import dialect as SQLiteDialect
        cast = sa.cast(column('x'), sa.Integer)
        @profile_memory()
        def go():
            dialect = SQLiteDialect()
            cast.compile(dialect=dialect)
        go()

    @testing.requires.cextensions
    def test_DecimalResultProcessor_init(self):
        @profile_memory()
        def go():
            to_decimal_processor_factory({}, 10)
        go()

    @testing.requires.cextensions
    def test_DecimalResultProcessor_process(self):
        @profile_memory()
        def go():
            to_decimal_processor_factory(decimal.Decimal, 10)(1.2)
        go()

    @testing.requires.cextensions
    def test_UnicodeResultProcessor_init(self):
        @profile_memory()
        def go():
            to_unicode_processor_factory('utf8')
        go()


