"""
a series of tests which assert the behavior of moving objects between
collections and scalar attributes resulting in the expected state w.r.t.
backrefs, add/remove events, etc.

there's a particular focus on collections that have "uselist=False", since in
these cases the re-assignment of an attribute means the previous owner needs an
UPDATE in the database.

"""

from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy.orm import attributes
from sqlalchemy.orm import backref
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.testing import eq_
from sqlalchemy.testing import is_
from sqlalchemy.testing.fixtures import fixture_session
from test.orm import _fixtures


class O2MCollectionTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        Address, addresses, users, User = (
            cls.classes.Address,
            cls.tables.addresses,
            cls.tables.users,
            cls.classes.User,
        )

        cls.mapper_registry.map_imperatively(Address, addresses)
        cls.mapper_registry.map_imperatively(
            User,
            users,
            properties=dict(addresses=relationship(Address, backref="user")),
        )

    def test_collection_move_hitslazy(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session(future=True)
        a1 = Address(email_address="address1")
        a2 = Address(email_address="address2")
        a3 = Address(email_address="address3")
        u1 = User(name="jack", addresses=[a1, a2, a3])
        u2 = User(name="ed")
        sess.add_all([u1, a1, a2, a3])
        sess.commit()

        # u1.addresses

        def go():
            u2.addresses.append(a1)
            u2.addresses.append(a2)
            u2.addresses.append(a3)

        self.assert_sql_count(testing.db, go, 0)

    def test_collection_move_preloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", addresses=[a1])

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # load u1.addresses collection
        u1.addresses

        u2.addresses.append(a1)

        # backref fires
        assert a1.user is u2

        # a1 removed from u1.addresses as of [ticket:2789]
        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_collection_move_notloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", addresses=[a1])

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        u2.addresses.append(a1)

        # backref fires
        assert a1.user is u2

        # u1.addresses wasn't loaded,
        # so when it loads its correct
        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_collection_move_commitfirst(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", addresses=[a1])

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # load u1.addresses collection
        u1.addresses

        u2.addresses.append(a1)

        # backref fires
        assert a1.user is u2

        # everything expires, no changes in
        # u1.addresses, so all is fine
        sess.commit()
        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_scalar_move_preloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()

        u1 = User(name="jack")
        u2 = User(name="ed")
        a1 = Address(email_address="a1")
        a1.user = u1
        sess.add_all([u1, u2, a1])
        sess.commit()

        # u1.addresses is loaded
        u1.addresses

        # direct set - the "old" is "fetched",
        # but only from the local session - not the
        # database, due to the PASSIVE_NO_FETCH flag.
        # this is a more fine grained behavior introduced
        # in 0.6
        a1.user = u2

        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_plain_load_passive(self):
        """test that many-to-one set doesn't load the old value."""

        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        u1 = User(name="jack")
        u2 = User(name="ed")
        a1 = Address(email_address="a1")
        a1.user = u1
        sess.add_all([u1, u2, a1])
        sess.commit()

        # in this case, a lazyload would
        # ordinarily occur except for the
        # PASSIVE_NO_FETCH flag.
        def go():
            a1.user = u2

        self.assert_sql_count(testing.db, go, 0)

        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_set_none(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        u1 = User(name="jack")
        a1 = Address(email_address="a1")
        a1.user = u1
        sess.add_all([u1, a1])
        sess.commit()

        # works for None too
        def go():
            a1.user = None

        self.assert_sql_count(testing.db, go, 0)

        assert a1 not in u1.addresses

    def test_scalar_move_notloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()

        u1 = User(name="jack")
        u2 = User(name="ed")
        a1 = Address(email_address="a1")
        a1.user = u1
        sess.add_all([u1, u2, a1])
        sess.commit()

        # direct set - the fetching of the
        # "old" u1 here allows the backref
        # to remove it from the addresses collection
        a1.user = u2

        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_scalar_move_commitfirst(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()

        u1 = User(name="jack")
        u2 = User(name="ed")
        a1 = Address(email_address="a1")
        a1.user = u1
        sess.add_all([u1, u2, a1])
        sess.commit()

        # u1.addresses is loaded
        u1.addresses

        # direct set - the fetching of the
        # "old" u1 here allows the backref
        # to remove it from the addresses collection
        a1.user = u2

        sess.commit()
        assert a1 not in u1.addresses
        assert a1 in u2.addresses

    def test_collection_assignment_mutates_previous_one(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User(name="jack")
        u2 = User(name="ed")
        a1 = Address(email_address="a1")
        u1.addresses.append(a1)

        is_(a1.user, u1)

        u2.addresses = [a1]

        eq_(u1.addresses, [])

        is_(a1.user, u2)

    def test_collection_assignment_mutates_previous_two(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User(name="jack")
        a1 = Address(email_address="a1")

        u1.addresses.append(a1)

        is_(a1.user, u1)

        u1.addresses = []
        is_(a1.user, None)

    def test_del_from_collection(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User(name="jack")
        a1 = Address(email_address="a1")

        u1.addresses.append(a1)

        is_(a1.user, u1)

        del u1.addresses[0]

        is_(a1.user, None)

    def test_del_from_scalar(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User(name="jack")
        a1 = Address(email_address="a1")

        u1.addresses.append(a1)

        is_(a1.user, u1)

        del a1.user

        assert a1 not in u1.addresses

    def test_tuple_assignment_w_reverse(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User()

        a1 = Address(email_address="1")
        a2 = Address(email_address="2")
        a3 = Address(email_address="3")
        u1.addresses.append(a1)
        u1.addresses.append(a2)
        u1.addresses.append(a3)

        u1.addresses[1], u1.addresses[2] = u1.addresses[2], u1.addresses[1]

        assert a3.user is u1

        eq_(u1.addresses, [a1, a3, a2])

    def test_straight_remove(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User()

        a1 = Address(email_address="1")
        a2 = Address(email_address="2")
        a3 = Address(email_address="3")
        u1.addresses.append(a1)
        u1.addresses.append(a2)
        u1.addresses.append(a3)

        del u1.addresses[2]

        assert a3.user is None
        eq_(u1.addresses, [a1, a2])

    def test_append_del(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User()

        a1 = Address(email_address="1")
        a2 = Address(email_address="2")
        a3 = Address(email_address="3")
        u1.addresses.append(a1)
        u1.addresses.append(a2)
        u1.addresses.append(a3)

        u1.addresses.append(a2)
        del u1.addresses[1]

        assert a2.user is u1
        eq_(u1.addresses, [a1, a3, a2])

    def test_bulk_replace(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User()

        a1 = Address(email_address="1")
        a2 = Address(email_address="2")
        a3 = Address(email_address="3")
        u1.addresses.append(a1)
        u1.addresses.append(a2)
        u1.addresses.append(a3)
        u1.addresses.append(a3)

        assert a3.user is u1

        u1.addresses = [a1, a2, a1]

        assert a3.user is None
        eq_(u1.addresses, [a1, a2, a1])


@testing.combinations(
    (
        "legacy_style",
        True,
    ),
    (
        "new_style",
        False,
    ),
    argnames="name, _legacy_inactive_history_style",
    id_="sa",
)
class O2OScalarBackrefMoveTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        Address, addresses, users, User = (
            cls.classes.Address,
            cls.tables.addresses,
            cls.tables.users,
            cls.classes.User,
        )

        cls.mapper_registry.map_imperatively(Address, addresses)
        cls.mapper_registry.map_imperatively(
            User,
            users,
            properties={
                "address": relationship(
                    Address,
                    backref=backref("user"),
                    uselist=False,
                    _legacy_inactive_history_style=(
                        cls._legacy_inactive_history_style
                    ),
                )
            },
        )

    def test_collection_move_preloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # load u1.address
        u1.address

        # reassign
        u2.address = a1
        assert u2.address is a1

        # backref fires
        assert a1.user is u2

        # doesn't extend to the previous attribute tho.
        # flushing at this point means its anyone's guess.
        assert u1.address is a1
        assert u2.address is a1

    def test_scalar_move_preloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        a2 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        sess.add_all([u1, a1, a2])
        sess.commit()  # everything is expired

        # load a1.user
        a1.user

        # reassign
        a2.user = u1

        # backref fires
        assert u1.address is a2

        # stays on both sides
        assert a1.user is u1
        assert a2.user is u1

    def test_collection_move_notloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # reassign
        u2.address = a1
        assert u2.address is a1

        # backref fires
        assert a1.user is u2

        # u1.address loads now after a flush
        assert u1.address is None
        assert u2.address is a1

    def test_scalar_move_notloaded(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        a2 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        sess.add_all([u1, a1, a2])
        sess.commit()  # everything is expired

        # reassign
        a2.user = u1

        # backref fires
        assert u1.address is a2

        eq_(
            a2._sa_instance_state.committed_state["user"],
            attributes.PASSIVE_NO_RESULT,
        )
        if not self._legacy_inactive_history_style:
            # autoflush during the a2.user
            assert a1.user is None
            assert a2.user is u1
        else:
            # stays on both sides
            assert a1.user is u1
            assert a2.user is u1

    def test_collection_move_commitfirst(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # load u1.address
        u1.address

        # reassign
        u2.address = a1
        assert u2.address is a1

        # backref fires
        assert a1.user is u2

        # the commit cancels out u1.addresses
        # being loaded, on next access its fine.
        sess.commit()
        assert u1.address is None
        assert u2.address is a1

    def test_scalar_move_commitfirst(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        a2 = Address(email_address="address2")
        u1 = User(name="jack", address=a1)

        sess.add_all([u1, a1, a2])
        sess.commit()  # everything is expired

        # load
        assert a1.user is u1

        # reassign
        a2.user = u1

        # backref fires
        assert u1.address is a2

        # didn't work this way tho
        assert a1.user is u1

        # moves appropriately after commit
        sess.commit()
        assert u1.address is a2
        assert a1.user is None
        assert a2.user is u1


@testing.combinations(
    (
        "legacy_style",
        True,
    ),
    (
        "new_style",
        False,
    ),
    argnames="name, _legacy_inactive_history_style",
    id_="sa",
)
class O2OScalarMoveTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        Address, addresses, users, User = (
            cls.classes.Address,
            cls.tables.addresses,
            cls.tables.users,
            cls.classes.User,
        )

        cls.mapper_registry.map_imperatively(Address, addresses)
        cls.mapper_registry.map_imperatively(
            User,
            users,
            properties={
                "address": relationship(
                    Address,
                    uselist=False,
                    _legacy_inactive_history_style=(
                        cls._legacy_inactive_history_style
                    ),
                )
            },
        )

    def test_collection_move_commitfirst(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session()
        a1 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        u2 = User(name="ed")
        sess.add_all([u1, u2])
        sess.commit()  # everything is expired

        # load u1.address
        u1.address

        # reassign
        u2.address = a1
        assert u2.address is a1

        # the commit cancels out u1.addresses
        # being loaded, on next access its fine.
        sess.commit()
        assert u1.address is None
        assert u2.address is a1


class O2OScalarOrphanTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        Address, addresses, users, User = (
            cls.classes.Address,
            cls.tables.addresses,
            cls.tables.users,
            cls.classes.User,
        )

        cls.mapper_registry.map_imperatively(Address, addresses)
        cls.mapper_registry.map_imperatively(
            User,
            users,
            properties={
                "address": relationship(
                    Address,
                    uselist=False,
                    backref=backref(
                        "user",
                        single_parent=True,
                        cascade="all, delete-orphan",
                    ),
                )
            },
        )

    def test_m2o_event(self):
        User, Address = self.classes.User, self.classes.Address

        sess = fixture_session(future=True)
        a1 = Address(email_address="address1")
        u1 = User(name="jack", address=a1)

        sess.add(u1)
        sess.commit()
        sess.expunge(u1)

        u2 = User(name="ed")
        # the _SingleParent extension sets the backref get to "active" !
        # u1 gets loaded and deleted
        sess.add(u2)
        u2.address = a1
        sess.commit()
        assert sess.query(User).count() == 1


class M2MCollectionMoveTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        keywords, items, item_keywords, Keyword, Item = (
            cls.tables.keywords,
            cls.tables.items,
            cls.tables.item_keywords,
            cls.classes.Keyword,
            cls.classes.Item,
        )

        cls.mapper_registry.map_imperatively(
            Item,
            items,
            properties={
                "keywords": relationship(
                    Keyword, secondary=item_keywords, backref="items"
                )
            },
        )
        cls.mapper_registry.map_imperatively(Keyword, keywords)

    def test_add_remove_pending_backref(self):
        """test that pending doesn't add an item that's not a net add."""

        Item, Keyword = (self.classes.Item, self.classes.Keyword)

        session = fixture_session(autoflush=False, future=True)

        i1 = Item(description="i1")
        session.add(i1)
        session.commit()

        session.expire(i1, ["keywords"])

        k1 = Keyword(name="k1")
        k1.items.append(i1)
        k1.items.remove(i1)
        eq_(i1.keywords, [])

    def test_remove_add_pending_backref(self):
        """test that pending doesn't remove an item that's not a net remove."""

        Item, Keyword = (self.classes.Item, self.classes.Keyword)

        session = fixture_session(autoflush=False)

        k1 = Keyword(name="k1")
        i1 = Item(description="i1", keywords=[k1])
        session.add(i1)
        session.commit()

        session.expire(i1, ["keywords"])

        k1.items.remove(i1)
        k1.items.append(i1)
        eq_(i1.keywords, [k1])

    def test_pending_combines_with_flushed(self):
        """test the combination of unflushed pending + lazy loaded from DB."""

        Item, Keyword = (self.classes.Item, self.classes.Keyword)

        session = Session(testing.db, autoflush=False)

        k1 = Keyword(name="k1")
        k2 = Keyword(name="k2")
        i1 = Item(description="i1", keywords=[k1])
        session.add(i1)
        session.add(k2)
        session.commit()

        k2.items.append(i1)
        # the pending
        # list is still here.
        eq_(
            set(
                attributes.instance_state(i1)
                ._pending_mutations["keywords"]
                .added_items
            ),
            {k2},
        )
        # because autoflush is off, k2 is still
        # coming in from pending
        eq_(i1.keywords, [k1, k2])

        # prove it didn't flush
        eq_(session.scalar(text("select count(*) from item_keywords")), 1)

        # the pending collection was removed
        assert (
            "keywords" not in attributes.instance_state(i1)._pending_mutations
        )

    def test_duplicate_adds(self):
        Item, Keyword = (self.classes.Item, self.classes.Keyword)

        session = Session(testing.db, autoflush=False)

        k1 = Keyword(name="k1")
        i1 = Item(description="i1", keywords=[k1])
        session.add(i1)
        session.commit()

        k1.items.append(i1)
        eq_(i1.keywords, [k1, k1])

        session.expire(i1, ["keywords"])
        k1.items.append(i1)
        eq_(i1.keywords, [k1, k1])

        session.expire(i1, ["keywords"])
        k1.items.append(i1)
        eq_(i1.keywords, [k1, k1])

        eq_(k1.items, [i1, i1, i1, i1])
        session.commit()
        eq_(k1.items, [i1])

    def test_bulk_replace(self):
        Item, Keyword = (self.classes.Item, self.classes.Keyword)

        k1 = Keyword(name="k1")
        k2 = Keyword(name="k2")
        k3 = Keyword(name="k3")
        i1 = Item(description="i1", keywords=[k1, k2])
        i2 = Item(description="i2", keywords=[k3])

        i1.keywords = [k2, k3]
        assert i1 in k3.items
        assert i2 in k3.items
        assert i1 not in k1.items


class M2MScalarMoveTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        keywords, items, item_keywords, Keyword, Item = (
            cls.tables.keywords,
            cls.tables.items,
            cls.tables.item_keywords,
            cls.classes.Keyword,
            cls.classes.Item,
        )

        cls.mapper_registry.map_imperatively(
            Item,
            items,
            properties={
                "keyword": relationship(
                    Keyword,
                    secondary=item_keywords,
                    uselist=False,
                    backref=backref("item", uselist=False),
                )
            },
        )
        cls.mapper_registry.map_imperatively(Keyword, keywords)

    def test_collection_move_preloaded(self):
        Item, Keyword = self.classes.Item, self.classes.Keyword

        sess = fixture_session()

        k1 = Keyword(name="k1")
        i1 = Item(description="i1", keyword=k1)
        i2 = Item(description="i2")

        sess.add_all([i1, i2, k1])
        sess.commit()  # everything is expired

        # load i1.keyword
        assert i1.keyword is k1

        i2.keyword = k1

        assert k1.item is i2

        # nothing happens.
        assert i1.keyword is k1
        assert i2.keyword is k1

    def test_collection_move_notloaded(self):
        Item, Keyword = self.classes.Item, self.classes.Keyword

        sess = fixture_session()

        k1 = Keyword(name="k1")
        i1 = Item(description="i1", keyword=k1)
        i2 = Item(description="i2")

        sess.add_all([i1, i2, k1])
        sess.commit()  # everything is expired

        i2.keyword = k1

        assert k1.item is i2

        assert i1.keyword is None
        assert i2.keyword is k1

    def test_collection_move_commit(self):
        Item, Keyword = self.classes.Item, self.classes.Keyword

        sess = fixture_session()

        k1 = Keyword(name="k1")
        i1 = Item(description="i1", keyword=k1)
        i2 = Item(description="i2")

        sess.add_all([i1, i2, k1])
        sess.commit()  # everything is expired

        # load i1.keyword
        assert i1.keyword is k1

        i2.keyword = k1

        assert k1.item is i2

        sess.commit()
        assert i1.keyword is None
        assert i2.keyword is k1


class O2MStaleBackrefTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        Address, addresses, users, User = (
            cls.classes.Address,
            cls.tables.addresses,
            cls.tables.users,
            cls.classes.User,
        )

        cls.mapper_registry.map_imperatively(Address, addresses)
        cls.mapper_registry.map_imperatively(
            User,
            users,
            properties=dict(addresses=relationship(Address, backref="user")),
        )

    def test_backref_pop_m2o(self):
        User, Address = self.classes.User, self.classes.Address

        u1 = User()
        u2 = User()
        a1 = Address()
        u1.addresses.append(a1)
        u2.addresses.append(a1)

        # a1 removed from u1.addresses as of [ticket:2789]
        assert a1 not in u1.addresses

        assert a1.user is u2
        assert a1 in u2.addresses


class M2MStaleBackrefTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_mappers(cls):
        keywords, items, item_keywords, Keyword, Item = (
            cls.tables.keywords,
            cls.tables.items,
            cls.tables.item_keywords,
            cls.classes.Keyword,
            cls.classes.Item,
        )

        cls.mapper_registry.map_imperatively(
            Item,
            items,
            properties={
                "keywords": relationship(
                    Keyword, secondary=item_keywords, backref="items"
                )
            },
        )
        cls.mapper_registry.map_imperatively(Keyword, keywords)

    def test_backref_pop_m2m(self):
        Keyword, Item = self.classes.Keyword, self.classes.Item

        k1 = Keyword()
        k2 = Keyword()
        i1 = Item()
        k1.items.append(i1)
        k2.items.append(i1)
        k2.items.append(i1)
        i1.keywords = []
        k2.items.remove(i1)
        assert len(k2.items) == 0
