"""basic tests of lazy loaded attributes"""

import datetime

import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import bindparam
from sqlalchemy import Boolean
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import Integer
from sqlalchemy import orm
from sqlalchemy import SmallInteger
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy.orm import attributes
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import create_session
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm.interfaces import MapperOption
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.types import TypeDecorator
from test.orm import _fixtures


class LazyTest(_fixtures.FixtureTest):
    run_inserts = "once"
    run_deletes = None

    def test_basic(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    mapper(Address, addresses), lazy="select"
                )
            },
        )
        sess = create_session()
        q = sess.query(User)
        eq_(
            [
                User(
                    id=7,
                    addresses=[Address(id=1, email_address="jack@bean.com")],
                )
            ],
            q.filter(users.c.id == 7).all(),
        )

    def test_needs_parent(self):
        """test the error raised when parent object is not bound."""

        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    mapper(Address, addresses), lazy="select"
                )
            },
        )
        sess = create_session()
        q = sess.query(User)
        u = q.filter(users.c.id == 7).first()
        sess.expunge(u)
        assert_raises(orm_exc.DetachedInstanceError, getattr, u, "addresses")

    def test_orderby(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    mapper(Address, addresses),
                    lazy="select",
                    order_by=addresses.c.email_address,
                )
            },
        )
        q = create_session().query(User)
        assert [
            User(id=7, addresses=[Address(id=1)]),
            User(
                id=8,
                addresses=[
                    Address(id=3, email_address="ed@bettyboop.com"),
                    Address(id=4, email_address="ed@lala.com"),
                    Address(id=2, email_address="ed@wood.com"),
                ],
            ),
            User(id=9, addresses=[Address(id=5)]),
            User(id=10, addresses=[]),
        ] == q.all()

    def test_orderby_secondary(self):
        """tests that a regular mapper select on a single table can
        order by a relationship to a second table"""

        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(Address, addresses)

        mapper(
            User,
            users,
            properties=dict(addresses=relationship(Address, lazy="select")),
        )
        q = create_session().query(User)
        result = (
            q.filter(users.c.id == addresses.c.user_id)
            .order_by(addresses.c.email_address)
            .all()
        )
        assert [
            User(
                id=8,
                addresses=[
                    Address(id=2, email_address="ed@wood.com"),
                    Address(id=3, email_address="ed@bettyboop.com"),
                    Address(id=4, email_address="ed@lala.com"),
                ],
            ),
            User(id=9, addresses=[Address(id=5)]),
            User(id=7, addresses=[Address(id=1)]),
        ] == result

    def test_orderby_desc(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(Address, addresses)

        mapper(
            User,
            users,
            properties=dict(
                addresses=relationship(
                    Address,
                    lazy="select",
                    order_by=[sa.desc(addresses.c.email_address)],
                )
            ),
        )
        sess = create_session()
        assert [
            User(id=7, addresses=[Address(id=1)]),
            User(
                id=8,
                addresses=[
                    Address(id=2, email_address="ed@wood.com"),
                    Address(id=4, email_address="ed@lala.com"),
                    Address(id=3, email_address="ed@bettyboop.com"),
                ],
            ),
            User(id=9, addresses=[Address(id=5)]),
            User(id=10, addresses=[]),
        ] == sess.query(User).all()

    def test_no_orphan(self):
        """test that a lazily loaded child object is not marked as an orphan"""

        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    Address, cascade="all,delete-orphan", lazy="select"
                )
            },
        )
        mapper(Address, addresses)

        sess = create_session()
        user = sess.query(User).get(7)
        assert getattr(User, "addresses").hasparent(
            attributes.instance_state(user.addresses[0]), optimistic=True
        )
        assert not sa.orm.class_mapper(Address)._is_orphan(
            attributes.instance_state(user.addresses[0])
        )

    def test_limit(self):
        """test limit operations combined with lazy-load relationships."""

        (
            users,
            items,
            order_items,
            orders,
            Item,
            User,
            Address,
            Order,
            addresses,
        ) = (
            self.tables.users,
            self.tables.items,
            self.tables.order_items,
            self.tables.orders,
            self.classes.Item,
            self.classes.User,
            self.classes.Address,
            self.classes.Order,
            self.tables.addresses,
        )

        mapper(Item, items)
        mapper(
            Order,
            orders,
            properties={
                "items": relationship(
                    Item, secondary=order_items, lazy="select"
                )
            },
        )
        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    mapper(Address, addresses), lazy="select"
                ),
                "orders": relationship(Order, lazy="select"),
            },
        )

        sess = create_session()
        q = sess.query(User)

        if testing.against("mssql"):
            result = q.limit(2).all()
            assert self.static.user_all_result[:2] == result
        else:
            result = q.limit(2).offset(1).all()
            assert self.static.user_all_result[1:3] == result

    def test_distinct(self):
        (
            users,
            items,
            order_items,
            orders,
            Item,
            User,
            Address,
            Order,
            addresses,
        ) = (
            self.tables.users,
            self.tables.items,
            self.tables.order_items,
            self.tables.orders,
            self.classes.Item,
            self.classes.User,
            self.classes.Address,
            self.classes.Order,
            self.tables.addresses,
        )

        mapper(Item, items)
        mapper(
            Order,
            orders,
            properties={
                "items": relationship(
                    Item, secondary=order_items, lazy="select"
                )
            },
        )
        mapper(
            User,
            users,
            properties={
                "addresses": relationship(
                    mapper(Address, addresses), lazy="select"
                ),
                "orders": relationship(Order, lazy="select"),
            },
        )

        sess = create_session()
        q = sess.query(User)

        # use a union all to get a lot of rows to join against
        u2 = users.alias("u2")
        s = sa.union_all(
            u2.select(use_labels=True),
            u2.select(use_labels=True),
            u2.select(use_labels=True),
        ).alias("u")
        result = (
            q.filter(s.c.u2_id == User.id).order_by(User.id).distinct().all()
        )
        eq_(self.static.user_all_result, result)

    def test_uselist_false_warning(self):
        """test that multiple rows received by a
        uselist=False raises a warning."""

        User, users, orders, Order = (
            self.classes.User,
            self.tables.users,
            self.tables.orders,
            self.classes.Order,
        )

        mapper(
            User,
            users,
            properties={"order": relationship(Order, uselist=False)},
        )
        mapper(Order, orders)
        s = create_session()
        u1 = s.query(User).filter(User.id == 7).one()
        assert_raises(sa.exc.SAWarning, getattr, u1, "order")

    def test_callable_bind(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties=dict(
                addresses=relationship(
                    mapper(Address, addresses),
                    lazy="select",
                    primaryjoin=and_(
                        users.c.id == addresses.c.user_id,
                        users.c.name
                        == bindparam("name", callable_=lambda: "ed"),
                    ),
                )
            ),
        )

        s = Session()
        ed = s.query(User).filter_by(name="ed").one()
        eq_(
            ed.addresses,
            [
                Address(id=2, user_id=8),
                Address(id=3, user_id=8),
                Address(id=4, user_id=8),
            ],
        )

        fred = s.query(User).filter_by(name="fred").one()
        eq_(fred.addresses, [])  # fred is missing

    def test_custom_bind(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties=dict(
                addresses=relationship(
                    mapper(Address, addresses),
                    lazy="select",
                    primaryjoin=and_(
                        users.c.id == addresses.c.user_id,
                        users.c.name == bindparam("name"),
                    ),
                )
            ),
        )

        canary = mock.Mock()

        class MyOption(MapperOption):
            propagate_to_loaders = True

            def __init__(self, crit):
                self.crit = crit

            def process_query_conditionally(self, query):
                """process query during a lazyload"""
                canary()
                query._params = query._params.union(dict(name=self.crit))

        s = Session()
        ed = s.query(User).options(MyOption("ed")).filter_by(name="ed").one()
        eq_(
            ed.addresses,
            [
                Address(id=2, user_id=8),
                Address(id=3, user_id=8),
                Address(id=4, user_id=8),
            ],
        )
        eq_(canary.mock_calls, [mock.call()])

        fred = (
            s.query(User).options(MyOption("ed")).filter_by(name="fred").one()
        )
        eq_(fred.addresses, [])  # fred is missing
        eq_(canary.mock_calls, [mock.call(), mock.call()])

        # the lazy query was not cached; the option is re-applied to the
        # Fred object due to populate_existing()
        fred = (
            s.query(User)
            .populate_existing()
            .options(MyOption("fred"))
            .filter_by(name="fred")
            .one()
        )
        eq_(fred.addresses, [Address(id=5, user_id=9)])  # fred is there

        eq_(canary.mock_calls, [mock.call(), mock.call(), mock.call()])

    def test_one_to_many_scalar(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties=dict(
                address=relationship(
                    mapper(Address, addresses), lazy="select", uselist=False
                )
            ),
        )
        q = create_session().query(User)
        result = q.filter(users.c.id == 7).all()
        assert [User(id=7, address=Address(id=1))] == result

    def test_many_to_one_binds(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        mapper(
            Address,
            addresses,
            primary_key=[addresses.c.user_id, addresses.c.email_address],
        )

        mapper(
            User,
            users,
            properties=dict(
                address=relationship(
                    Address,
                    uselist=False,
                    primaryjoin=sa.and_(
                        users.c.id == addresses.c.user_id,
                        addresses.c.email_address == "ed@bettyboop.com",
                    ),
                )
            ),
        )
        q = create_session().query(User)
        eq_(
            [
                User(id=7, address=None),
                User(id=8, address=Address(id=3)),
                User(id=9, address=None),
                User(id=10, address=None),
            ],
            list(q),
        )

    def test_double(self):
        """tests lazy loading with two relationships simultaneously,
        from the same table, using aliases.  """

        users, orders, User, Address, Order, addresses = (
            self.tables.users,
            self.tables.orders,
            self.classes.User,
            self.classes.Address,
            self.classes.Order,
            self.tables.addresses,
        )

        openorders = sa.alias(orders, "openorders")
        closedorders = sa.alias(orders, "closedorders")

        mapper(Address, addresses)

        mapper(Order, orders)

        open_mapper = mapper(Order, openorders, non_primary=True)
        closed_mapper = mapper(Order, closedorders, non_primary=True)
        mapper(
            User,
            users,
            properties=dict(
                addresses=relationship(Address, lazy=True),
                open_orders=relationship(
                    open_mapper,
                    primaryjoin=sa.and_(
                        openorders.c.isopen == 1,
                        users.c.id == openorders.c.user_id,
                    ),
                    lazy="select",
                ),
                closed_orders=relationship(
                    closed_mapper,
                    primaryjoin=sa.and_(
                        closedorders.c.isopen == 0,
                        users.c.id == closedorders.c.user_id,
                    ),
                    lazy="select",
                ),
            ),
        )
        q = create_session().query(User)

        assert [
            User(
                id=7,
                addresses=[Address(id=1)],
                open_orders=[Order(id=3)],
                closed_orders=[Order(id=1), Order(id=5)],
            ),
            User(
                id=8,
                addresses=[Address(id=2), Address(id=3), Address(id=4)],
                open_orders=[],
                closed_orders=[],
            ),
            User(
                id=9,
                addresses=[Address(id=5)],
                open_orders=[Order(id=4)],
                closed_orders=[Order(id=2)],
            ),
            User(id=10),
        ] == q.all()

        sess = create_session()
        user = sess.query(User).get(7)
        eq_(
            [Order(id=1), Order(id=5)],
            create_session()
            .query(closed_mapper)
            .with_parent(user, property="closed_orders")
            .all(),
        )
        eq_(
            [Order(id=3)],
            create_session()
            .query(open_mapper)
            .with_parent(user, property="open_orders")
            .all(),
        )

    def test_many_to_many(self):
        keywords, items, item_keywords, Keyword, Item = (
            self.tables.keywords,
            self.tables.items,
            self.tables.item_keywords,
            self.classes.Keyword,
            self.classes.Item,
        )

        mapper(Keyword, keywords)
        mapper(
            Item,
            items,
            properties=dict(
                keywords=relationship(
                    Keyword, secondary=item_keywords, lazy="select"
                )
            ),
        )

        q = create_session().query(Item)
        assert self.static.item_keyword_result == q.all()

        eq_(
            self.static.item_keyword_result[0:2],
            q.join("keywords").filter(keywords.c.name == "red").all(),
        )

    def test_uses_get(self):
        """test that a simple many-to-one lazyload optimizes
        to use query.get()."""

        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        for pj in (
            None,
            users.c.id == addresses.c.user_id,
            addresses.c.user_id == users.c.id,
        ):
            mapper(
                Address,
                addresses,
                properties=dict(
                    user=relationship(
                        mapper(User, users), lazy="select", primaryjoin=pj
                    )
                ),
            )

            sess = create_session()

            # load address
            a1 = (
                sess.query(Address)
                .filter_by(email_address="ed@wood.com")
                .one()
            )

            # load user that is attached to the address
            u1 = sess.query(User).get(8)

            def go():
                # lazy load of a1.user should get it from the session
                assert a1.user is u1

            self.assert_sql_count(testing.db, go, 0)
            sa.orm.clear_mappers()

    def test_uses_get_compatible_types(self):
        """test the use_get optimization with compatible
        but non-identical types"""

        User, Address = self.classes.User, self.classes.Address

        class IntDecorator(TypeDecorator):
            impl = Integer

        class SmallintDecorator(TypeDecorator):
            impl = SmallInteger

        class SomeDBInteger(sa.Integer):
            pass

        for tt in [
            Integer,
            SmallInteger,
            IntDecorator,
            SmallintDecorator,
            SomeDBInteger,
        ]:
            m = sa.MetaData()
            users = Table(
                "users",
                m,
                Column(
                    "id",
                    Integer,
                    primary_key=True,
                    test_needs_autoincrement=True,
                ),
                Column("name", String(30), nullable=False),
            )
            addresses = Table(
                "addresses",
                m,
                Column(
                    "id",
                    Integer,
                    primary_key=True,
                    test_needs_autoincrement=True,
                ),
                Column("user_id", tt, ForeignKey("users.id")),
                Column("email_address", String(50), nullable=False),
            )

            mapper(
                Address,
                addresses,
                properties=dict(user=relationship(mapper(User, users))),
            )

            sess = create_session(bind=testing.db)

            # load address
            a1 = (
                sess.query(Address)
                .filter_by(email_address="ed@wood.com")
                .one()
            )

            # load user that is attached to the address
            u1 = sess.query(User).get(8)

            def go():
                # lazy load of a1.user should get it from the session
                assert a1.user is u1

            self.assert_sql_count(testing.db, go, 0)
            sa.orm.clear_mappers()

    def test_many_to_one(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            Address,
            addresses,
            properties=dict(
                user=relationship(mapper(User, users), lazy="select")
            ),
        )
        sess = create_session()
        q = sess.query(Address)
        a = q.filter(addresses.c.id == 1).one()

        assert a.user is not None

        u1 = sess.query(User).get(7)

        assert a.user is u1

    def test_backrefs_dont_lazyload(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={"addresses": relationship(Address, backref="user")},
        )
        mapper(Address, addresses)
        sess = create_session()
        ad = sess.query(Address).filter_by(id=1).one()
        assert ad.user.id == 7

        def go():
            ad.user = None
            assert ad.user is None

        self.assert_sql_count(testing.db, go, 0)

        u1 = sess.query(User).filter_by(id=7).one()

        def go():
            assert ad not in u1.addresses

        self.assert_sql_count(testing.db, go, 1)

        sess.expire(u1, ["addresses"])

        def go():
            assert ad in u1.addresses

        self.assert_sql_count(testing.db, go, 1)

        sess.expire(u1, ["addresses"])
        ad2 = Address()

        def go():
            ad2.user = u1
            assert ad2.user is u1

        self.assert_sql_count(testing.db, go, 0)

        def go():
            assert ad2 in u1.addresses

        self.assert_sql_count(testing.db, go, 1)


class GetterStateTest(_fixtures.FixtureTest):

    """test lazyloader on non-existent attribute returns
    expected attribute symbols, maintain expected state"""

    run_inserts = None

    def _unhashable_fixture(self, metadata, load_on_pending=False):
        class MyHashType(sa.TypeDecorator):
            impl = sa.String(100)

            def process_bind_param(self, value, dialect):
                return ";".join(
                    "%s=%s" % (k, v)
                    for k, v in sorted(value.items(), key=lambda key: key[0])
                )

            def process_result_value(self, value, dialect):
                return dict(elem.split("=", 1) for elem in value.split(";"))

        category = Table(
            "category",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("data", MyHashType()),
        )
        article = Table(
            "article",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("data", MyHashType()),
        )

        class Category(fixtures.ComparableEntity):
            pass

        class Article(fixtures.ComparableEntity):
            pass

        mapper(Category, category)
        mapper(
            Article,
            article,
            properties={
                "category": relationship(
                    Category,
                    primaryjoin=orm.foreign(article.c.data) == category.c.data,
                    load_on_pending=load_on_pending,
                )
            },
        )

        metadata.create_all()
        sess = Session(autoflush=False)
        data = {"im": "unhashable"}
        a1 = Article(id=1, data=data)
        c1 = Category(id=1, data=data)
        if load_on_pending:
            sess.add(c1)
        else:
            sess.add_all([c1, a1])
        sess.flush()
        if load_on_pending:
            sess.add(a1)
        return Category, Article, sess, a1, c1

    def _u_ad_fixture(self, populate_user, dont_use_get=False):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={
                "addresses": relationship(Address, back_populates="user")
            },
        )
        mapper(
            Address,
            addresses,
            properties={
                "user": relationship(
                    User,
                    primaryjoin=and_(
                        users.c.id == addresses.c.user_id, users.c.id != 27
                    )
                    if dont_use_get
                    else None,
                    back_populates="addresses",
                )
            },
        )

        sess = create_session()
        a1 = Address(email_address="a1")
        sess.add(a1)
        if populate_user:
            a1.user = User(name="ed")
        sess.flush()
        if populate_user:
            sess.expire_all()
        return User, Address, sess, a1

    def test_no_use_get_params_missing(self):
        User, Address, sess, a1 = self._u_ad_fixture(False, True)

        def go():
            eq_(a1.user, None)

        # doesn't emit SQL
        self.assert_sql_count(testing.db, go, 0)

    @testing.provide_metadata
    def test_no_use_get_params_not_hashable(self):
        Category, Article, sess, a1, c1 = self._unhashable_fixture(
            self.metadata
        )

        def go():
            eq_(a1.category, c1)

        self.assert_sql_count(testing.db, go, 1)

    @testing.provide_metadata
    def test_no_use_get_params_not_hashable_on_pending(self):
        Category, Article, sess, a1, c1 = self._unhashable_fixture(
            self.metadata, load_on_pending=True
        )

        def go():
            eq_(a1.category, c1)

        self.assert_sql_count(testing.db, go, 1)

    def test_get_empty_passive_return_never_set(self):
        User, Address, sess, a1 = self._u_ad_fixture(False)
        eq_(
            Address.user.impl.get(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_RETURN_NEVER_SET,
            ),
            attributes.NEVER_SET,
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_history_empty_passive_return_never_set(self):
        User, Address, sess, a1 = self._u_ad_fixture(False)
        eq_(
            Address.user.impl.get_history(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_RETURN_NEVER_SET,
            ),
            ((), (), ()),
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_get_empty_passive_no_initialize(self):
        User, Address, sess, a1 = self._u_ad_fixture(False)
        eq_(
            Address.user.impl.get(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_NO_INITIALIZE,
            ),
            attributes.PASSIVE_NO_RESULT,
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_history_empty_passive_no_initialize(self):
        User, Address, sess, a1 = self._u_ad_fixture(False)
        eq_(
            Address.user.impl.get_history(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_NO_INITIALIZE,
            ),
            attributes.HISTORY_BLANK,
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_get_populated_passive_no_initialize(self):
        User, Address, sess, a1 = self._u_ad_fixture(True)
        eq_(
            Address.user.impl.get(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_NO_INITIALIZE,
            ),
            attributes.PASSIVE_NO_RESULT,
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_history_populated_passive_no_initialize(self):
        User, Address, sess, a1 = self._u_ad_fixture(True)
        eq_(
            Address.user.impl.get_history(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_NO_INITIALIZE,
            ),
            attributes.HISTORY_BLANK,
        )
        assert "user_id" not in a1.__dict__
        assert "user" not in a1.__dict__

    def test_get_populated_passive_return_never_set(self):
        User, Address, sess, a1 = self._u_ad_fixture(True)
        eq_(
            Address.user.impl.get(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_RETURN_NEVER_SET,
            ),
            User(name="ed"),
        )

    def test_history_populated_passive_return_never_set(self):
        User, Address, sess, a1 = self._u_ad_fixture(True)
        eq_(
            Address.user.impl.get_history(
                attributes.instance_state(a1),
                attributes.instance_dict(a1),
                passive=attributes.PASSIVE_RETURN_NEVER_SET,
            ),
            ((), [User(name="ed")], ()),
        )


class M2OGetTest(_fixtures.FixtureTest):
    run_inserts = "once"
    run_deletes = None

    def test_m2o_noload(self):
        """test that a NULL foreign key doesn't trigger a lazy load"""

        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(User, users)

        mapper(Address, addresses, properties={"user": relationship(User)})

        sess = create_session()
        ad1 = Address(email_address="somenewaddress", id=12)
        sess.add(ad1)
        sess.flush()
        sess.expunge_all()

        ad2 = sess.query(Address).get(1)
        ad3 = sess.query(Address).get(ad1.id)

        def go():
            # one lazy load
            assert ad2.user.name == "jack"
            # no lazy load
            assert ad3.user is None

        self.assert_sql_count(testing.db, go, 1)


class CorrelatedTest(fixtures.MappedTest):
    @classmethod
    def define_tables(self, meta):
        Table(
            "user_t",
            meta,
            Column("id", Integer, primary_key=True),
            Column("name", String(50)),
        )

        Table(
            "stuff",
            meta,
            Column("id", Integer, primary_key=True),
            Column("date", sa.Date),
            Column("user_id", Integer, ForeignKey("user_t.id")),
        )

    @classmethod
    def insert_data(cls):
        stuff, user_t = cls.tables.stuff, cls.tables.user_t

        user_t.insert().execute(
            {"id": 1, "name": "user1"},
            {"id": 2, "name": "user2"},
            {"id": 3, "name": "user3"},
        )

        stuff.insert().execute(
            {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
            {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
            {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
            {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
            {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
        )

    def test_correlated_lazyload(self):
        stuff, user_t = self.tables.stuff, self.tables.user_t

        class User(fixtures.ComparableEntity):
            pass

        class Stuff(fixtures.ComparableEntity):
            pass

        mapper(Stuff, stuff)

        stuff_view = (
            sa.select([stuff.c.id])
            .where(stuff.c.user_id == user_t.c.id)
            .correlate(user_t)
            .order_by(sa.desc(stuff.c.date))
            .limit(1)
        )

        mapper(
            User,
            user_t,
            properties={
                "stuff": relationship(
                    Stuff,
                    primaryjoin=sa.and_(
                        user_t.c.id == stuff.c.user_id,
                        stuff.c.id == (stuff_view.as_scalar()),
                    ),
                )
            },
        )

        sess = create_session()

        eq_(
            sess.query(User).all(),
            [
                User(
                    name="user1",
                    stuff=[Stuff(date=datetime.date(2007, 12, 15), id=2)],
                ),
                User(
                    name="user2",
                    stuff=[Stuff(id=4, date=datetime.date(2008, 1, 15))],
                ),
                User(
                    name="user3",
                    stuff=[Stuff(id=5, date=datetime.date(2007, 6, 15))],
                ),
            ],
        )


class O2MWOSideFixedTest(fixtures.MappedTest):
    # test #2948 - o2m backref with a "m2o does/does not count"
    # criteria doesn't scan the "o" table

    @classmethod
    def define_tables(self, meta):
        Table(
            "city",
            meta,
            Column("id", Integer, primary_key=True),
            Column("deleted", Boolean),
        )
        Table(
            "person",
            meta,
            Column("id", Integer, primary_key=True),
            Column("city_id", ForeignKey("city.id")),
        )

    @classmethod
    def setup_classes(cls):
        class Person(cls.Basic):
            pass

        class City(cls.Basic):
            pass

    @classmethod
    def setup_mappers(cls):
        Person, City = cls.classes.Person, cls.classes.City
        city, person = cls.tables.city, cls.tables.person

        mapper(
            Person,
            person,
            properties={
                "city": relationship(
                    City,
                    primaryjoin=and_(
                        person.c.city_id == city.c.id, city.c.deleted == False
                    ),  # noqa
                    backref="people",
                )
            },
        )
        mapper(City, city)

    def _fixture(self, include_other):
        city, person = self.tables.city, self.tables.person

        if include_other:
            city.insert().execute({"id": 1, "deleted": False})

            person.insert().execute(
                {"id": 1, "city_id": 1}, {"id": 2, "city_id": 1}
            )

        city.insert().execute({"id": 2, "deleted": True})

        person.insert().execute(
            {"id": 3, "city_id": 2}, {"id": 4, "city_id": 2}
        )

    def test_lazyload_assert_expected_sql(self):
        self._fixture(True)
        City = self.classes.City
        sess = Session(testing.db)
        c1, c2 = sess.query(City).order_by(City.id).all()

        def go():
            eq_([p.id for p in c2.people], [])

        self.assert_sql_execution(
            testing.db,
            go,
            CompiledSQL(
                "SELECT person.id AS person_id, person.city_id AS "
                "person_city_id FROM person "
                "WHERE person.city_id = :param_1 AND :param_2 = 0",
                {"param_1": 2, "param_2": 1},
            ),
        )

    def test_lazyload_people_other_exists(self):
        self._fixture(True)
        City = self.classes.City
        sess = Session(testing.db)
        c1, c2 = sess.query(City).order_by(City.id).all()
        eq_([p.id for p in c1.people], [1, 2])

        eq_([p.id for p in c2.people], [])

    def test_lazyload_people_no_other_exists(self):
        # note that if we revert #2948, *this still passes!*
        # e.g. due to the scan of the "o" table, whether or not *another*
        # row exists determines if this works.

        self._fixture(False)
        City = self.classes.City
        sess = Session(testing.db)
        c2, = sess.query(City).order_by(City.id).all()

        eq_([p.id for p in c2.people], [])


class RefersToSelfLazyLoadInterferenceTest(fixtures.MappedTest):
    """Test [issue:3145].

    This involves an object that refers to itself, which isn't
    entirely a supported use case.   Here, we're able to fix it,
    but long term it's not clear if future needs will affect this.
    The use case is not super-critical.

    """

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "a",
            metadata,
            Column("a_id", Integer, primary_key=True),
            Column("b_id", ForeignKey("b.b_id")),
        )

        Table(
            "b",
            metadata,
            Column("b_id", Integer, primary_key=True),
            Column("parent_id", ForeignKey("b.b_id")),
        )

        Table(
            "c",
            metadata,
            Column("c_id", Integer, primary_key=True),
            Column("b_id", ForeignKey("b.b_id")),
        )

    @classmethod
    def setup_classes(cls):
        class A(cls.Basic):
            pass

        class B(cls.Basic):
            pass

        class C(cls.Basic):
            pass

    @classmethod
    def setup_mappers(cls):
        mapper(
            cls.classes.A,
            cls.tables.a,
            properties={"b": relationship(cls.classes.B)},
        )
        bm = mapper(
            cls.classes.B,
            cls.tables.b,
            properties={
                "parent": relationship(
                    cls.classes.B, remote_side=cls.tables.b.c.b_id
                ),
                "zc": relationship(cls.classes.C),
            },
        )
        mapper(cls.classes.C, cls.tables.c)

        bmp = bm._props
        configure_mappers()
        # Bug is order-dependent, must sort the "zc" property to the end
        bmp.sort()

    def test_lazy_doesnt_interfere(self):
        A, B, C = self.classes("A", "B", "C")

        session = Session()
        b = B()
        session.add(b)
        session.flush()

        b.parent_id = b.b_id

        b.zc.append(C())
        b.zc.append(C())
        session.commit()

        # If the bug is here, the next line throws an exception
        session.query(B).options(
            sa.orm.joinedload("parent").joinedload("zc")
        ).all()


class TypeCoerceTest(fixtures.MappedTest, testing.AssertsExecutionResults):
    """ORM-level test for [ticket:3531]"""

    # mysql is having a recursion issue in the bind_expression
    __only_on__ = ("sqlite", "postgresql")

    class StringAsInt(TypeDecorator):
        impl = String(50)

        def column_expression(self, col):
            return sa.cast(col, Integer)

        def bind_expression(self, col):
            return sa.cast(col, String)

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "person", metadata, Column("id", cls.StringAsInt, primary_key=True)
        )
        Table(
            "pets",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("person_id", Integer),
        )

    @classmethod
    def setup_classes(cls):
        class Person(cls.Basic):
            pass

        class Pet(cls.Basic):
            pass

    @classmethod
    def setup_mappers(cls):
        mapper(
            cls.classes.Person,
            cls.tables.person,
            properties=dict(
                pets=relationship(
                    cls.classes.Pet,
                    primaryjoin=(
                        orm.foreign(cls.tables.pets.c.person_id)
                        == sa.cast(
                            sa.type_coerce(cls.tables.person.c.id, Integer),
                            Integer,
                        )
                    ),
                )
            ),
        )

        mapper(cls.classes.Pet, cls.tables.pets)

    def test_lazyload_singlecast(self):
        Person = self.classes.Person
        Pet = self.classes.Pet

        s = Session()
        s.add_all([Person(id=5), Pet(id=1, person_id=5)])
        s.commit()

        p1 = s.query(Person).first()

        with self.sql_execution_asserter() as asserter:
            p1.pets

        asserter.assert_(
            CompiledSQL(
                "SELECT pets.id AS pets_id, pets.person_id "
                "AS pets_person_id FROM pets "
                "WHERE pets.person_id = CAST(:param_1 AS INTEGER)",
                [{"param_1": 5}],
            )
        )


class CompositeSimpleM2OTest(fixtures.MappedTest):
    """ORM-level test for [ticket:3788]"""

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "a",
            metadata,
            Column("id1", Integer, primary_key=True),
            Column("id2", Integer, primary_key=True),
        )

        Table(
            "b_sameorder",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("a_id1", Integer),
            Column("a_id2", Integer),
            ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
        )

        Table(
            "b_differentorder",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("a_id1", Integer),
            Column("a_id2", Integer),
            ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
        )

    @classmethod
    def setup_classes(cls):
        class A(cls.Basic):
            pass

        class B(cls.Basic):
            pass

    def test_use_get_sameorder(self):
        mapper(self.classes.A, self.tables.a)
        m_b = mapper(
            self.classes.B,
            self.tables.b_sameorder,
            properties={"a": relationship(self.classes.A)},
        )

        configure_mappers()
        is_true(m_b.relationships.a.strategy.use_get)

    def test_use_get_reverseorder(self):
        mapper(self.classes.A, self.tables.a)
        m_b = mapper(
            self.classes.B,
            self.tables.b_differentorder,
            properties={"a": relationship(self.classes.A)},
        )

        configure_mappers()
        is_true(m_b.relationships.a.strategy.use_get)

    def test_dont_use_get_pj_is_different(self):
        mapper(self.classes.A, self.tables.a)
        m_b = mapper(
            self.classes.B,
            self.tables.b_sameorder,
            properties={
                "a": relationship(
                    self.classes.A,
                    primaryjoin=and_(
                        self.tables.a.c.id1 == self.tables.b_sameorder.c.a_id1,
                        self.tables.a.c.id2 == 12,
                    ),
                )
            },
        )

        configure_mappers()
        is_false(m_b.relationships.a.strategy.use_get)
