from __future__ import annotations

from collections import abc
import copy
import dataclasses
import pickle
from typing import List
from unittest.mock import call
from unittest.mock import Mock

from sqlalchemy import cast
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy.engine import default
from sqlalchemy.ext.associationproxy import _AssociationList
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import collections
from sqlalchemy.orm import composite
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm.collections import attribute_keyed_dict
from sqlalchemy.orm.collections import collection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_none
from sqlalchemy.testing import is_not_none
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.entities import ComparableEntity  # noqa
from sqlalchemy.testing.entities import ComparableMixin  # noqa
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect


class DictCollection(dict):
    @collection.appender
    def append(self, obj):
        self[obj.foo] = obj

    @collection.remover
    def remove(self, obj):
        del self[obj.foo]


class SetCollection(set):
    pass


class ListCollection(list):
    pass


class ObjectCollection:
    def __init__(self):
        self.values = list()

    @collection.appender
    def append(self, obj):
        self.values.append(obj)

    @collection.remover
    def remove(self, obj):
        self.values.remove(obj)

    def __iter__(self):
        return iter(self.values)


class AutoFlushTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table(
            "parent",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
        )
        Table(
            "association",
            metadata,
            Column("parent_id", ForeignKey("parent.id"), primary_key=True),
            Column("child_id", ForeignKey("child.id"), primary_key=True),
            Column("name", String(50)),
        )
        Table(
            "child",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(50)),
        )

    def _fixture(self, collection_class, is_dict=False):
        class Parent:
            collection = association_proxy("_collection", "child")

        class Child:
            def __init__(self, name):
                self.name = name

        class Association:
            if is_dict:

                def __init__(self, key, child):
                    self.child = child

            else:

                def __init__(self, child):
                    self.child = child

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parent,
            properties={
                "_collection": relationship(
                    Association,
                    collection_class=collection_class,
                    backref="parent",
                )
            },
        )
        self.mapper_registry.map_imperatively(
            Association,
            self.tables.association,
            properties={"child": relationship(Child, backref="association")},
        )
        self.mapper_registry.map_imperatively(Child, self.tables.child)

        return Parent, Child, Association

    def _test_premature_flush(self, collection_class, fn, is_dict=False):
        Parent, Child, Association = self._fixture(
            collection_class, is_dict=is_dict
        )

        session = Session(
            testing.db, autoflush=True, expire_on_commit=True, future=True
        )

        p1 = Parent()
        c1 = Child("c1")
        c2 = Child("c2")
        session.add(p1)
        session.add(c1)
        session.add(c2)

        fn(p1.collection, c1)
        session.commit()

        fn(p1.collection, c2)
        session.commit()

        is_(c1.association[0].parent, p1)
        is_(c2.association[0].parent, p1)

        session.close()

    def test_list_append(self):
        self._test_premature_flush(
            list, lambda collection, obj: collection.append(obj)
        )

    def test_list_extend(self):
        self._test_premature_flush(
            list, lambda collection, obj: collection.extend([obj])
        )

    def test_set_add(self):
        self._test_premature_flush(
            set, lambda collection, obj: collection.add(obj)
        )

    def test_set_extend(self):
        self._test_premature_flush(
            set, lambda collection, obj: collection.update([obj])
        )

    def test_dict_set(self):
        def set_(collection, obj):
            collection[obj.name] = obj

        self._test_premature_flush(
            collections.attribute_keyed_dict(
                "name", ignore_unpopulated_attribute=True
            ),
            set_,
            is_dict=True,
        )


class _CollectionOperations(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table(
            "Parent",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(128)),
        )
        Table(
            "Children",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("parent_id", Integer, ForeignKey("Parent.id")),
            Column("foo", String(128)),
            Column("name", String(128)),
        )

    @classmethod
    def setup_mappers(cls):
        collection_class = cls.collection_class

        class Parent(cls.Basic):
            children = association_proxy("_children", "name")

            def __init__(self, name):
                self.name = name

        class Child(cls.Basic):
            if collection_class and issubclass(collection_class, dict):

                def __init__(self, foo, name):
                    self.foo = foo
                    self.name = name

            else:

                def __init__(self, name):
                    self.name = name

        parents_table, children_table = cls.tables("Parent", "Children")
        cls.mapper_registry.map_imperatively(
            Parent,
            parents_table,
            properties={
                "_children": relationship(
                    Child,
                    lazy="joined",
                    backref="parent",
                    collection_class=collection_class,
                )
            },
        )
        cls.mapper_registry.map_imperatively(Child, children_table)

    def test_abc(self):
        Parent = self.classes.Parent

        p1 = Parent("x")

        collection_class = self.collection_class or list

        for abc_ in (abc.Set, abc.MutableMapping, abc.MutableSequence):
            if issubclass(collection_class, abc_):
                break
        else:
            abc_ = None

        if abc_:
            p1 = Parent("x")
            assert isinstance(p1.children, abc_)

    def roundtrip(self, obj):
        if obj not in self.session:
            self.session.add(obj)
        self.session.flush()
        id_, type_ = obj.id, type(obj)
        self.session.expunge_all()
        return self.session.get(type_, id_)

    def _test_sequence_ops(self):
        Parent, Child = self.classes("Parent", "Child")
        self.session = fixture_session()

        p1 = Parent("P1")

        def assert_index(expected, value, *args):
            """Assert index of child value is equal to expected.

            If expected is None, assert that index raises ValueError.
            """
            try:
                index = p1.children.index(value, *args)
            except ValueError:
                self.assert_(expected is None)
            else:
                self.assert_(expected is not None)
                self.assert_(index == expected)

        self.assert_(not p1._children)
        self.assert_(not p1.children)

        ch = Child("regular")
        p1._children.append(ch)

        self.assert_(ch in p1._children)
        self.assert_(len(p1._children) == 1)

        self.assert_(p1.children)
        self.assert_(len(p1.children) == 1)
        self.assert_(ch not in p1.children)
        self.assert_("regular" in p1.children)

        assert_index(0, "regular")
        assert_index(None, "regular", 1)

        p1.children.append("proxied")

        self.assert_("proxied" in p1.children)
        self.assert_("proxied" not in p1._children)
        self.assert_(len(p1.children) == 2)
        self.assert_(len(p1._children) == 2)

        self.assert_(p1._children[0].name == "regular")
        self.assert_(p1._children[1].name == "proxied")

        assert_index(0, "regular")
        assert_index(1, "proxied")
        assert_index(1, "proxied", 1)
        assert_index(None, "proxied", 0, 1)

        del p1._children[1]

        self.assert_(len(p1._children) == 1)
        self.assert_(len(p1.children) == 1)
        self.assert_(p1._children[0] == ch)

        assert_index(None, "proxied")

        del p1.children[0]

        self.assert_(len(p1._children) == 0)
        self.assert_(len(p1.children) == 0)

        assert_index(None, "regular")

        p1.children = ["a", "b", "c"]
        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        assert_index(0, "a")
        assert_index(1, "b")
        assert_index(2, "c")

        del ch
        p1 = self.roundtrip(p1)

        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        assert_index(0, "a")
        assert_index(1, "b")
        assert_index(2, "c")

        popped = p1.children.pop()
        self.assert_(len(p1.children) == 2)
        self.assert_(popped not in p1.children)
        assert_index(None, popped)

        p1 = self.roundtrip(p1)
        self.assert_(len(p1.children) == 2)
        self.assert_(popped not in p1.children)
        assert_index(None, popped)

        p1.children[1] = "changed-in-place"
        self.assert_(p1.children[1] == "changed-in-place")
        assert_index(1, "changed-in-place")
        assert_index(None, "b")

        inplace_id = p1._children[1].id
        p1 = self.roundtrip(p1)
        self.assert_(p1.children[1] == "changed-in-place")
        assert p1._children[1].id == inplace_id

        p1.children.append("changed-in-place")
        self.assert_(p1.children.count("changed-in-place") == 2)
        assert_index(1, "changed-in-place")

        p1.children.remove("changed-in-place")
        self.assert_(p1.children.count("changed-in-place") == 1)
        assert_index(1, "changed-in-place")

        p1 = self.roundtrip(p1)
        self.assert_(p1.children.count("changed-in-place") == 1)
        assert_index(1, "changed-in-place")

        p1._children = []
        self.assert_(len(p1.children) == 0)
        assert_index(None, "changed-in-place")

        after = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
        p1.children = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
        self.assert_(len(p1.children) == 10)
        self.assert_([c.name for c in p1._children] == after)
        for i, val in enumerate(after):
            assert_index(i, val)

        p1.children[2:6] = ["x"] * 4
        after = ["a", "b", "x", "x", "x", "x", "g", "h", "i", "j"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)
        assert_index(2, "x")
        assert_index(3, "x", 3)
        assert_index(None, "x", 6)

        p1.children[2:6] = ["y"]
        after = ["a", "b", "y", "g", "h", "i", "j"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)
        assert_index(2, "y")
        assert_index(None, "y", 3)

        p1.children[2:3] = ["z"] * 4
        after = ["a", "b", "z", "z", "z", "z", "g", "h", "i", "j"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children[2::2] = ["O"] * 4
        after = ["a", "b", "O", "z", "O", "z", "O", "h", "O", "j"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        assert_raises(TypeError, set, [p1.children])

        p1.children *= 0
        after = []
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children += ["a", "b"]
        after = ["a", "b"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children[:] = ["d", "e"]
        after = ["d", "e"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children[:] = ["a", "b"]

        p1.children += ["c"]
        after = ["a", "b", "c"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children *= 1
        after = ["a", "b", "c"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children *= 2
        after = ["a", "b", "c", "a", "b", "c"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        p1.children = ["a"]
        after = ["a"]
        self.assert_(p1.children == after)
        self.assert_([c.name for c in p1._children] == after)

        self.assert_((p1.children * 2) == ["a", "a"])
        self.assert_((2 * p1.children) == ["a", "a"])
        self.assert_((p1.children * 0) == [])
        self.assert_((0 * p1.children) == [])

        self.assert_((p1.children + ["b"]) == ["a", "b"])
        self.assert_((["b"] + p1.children) == ["b", "a"])

        try:
            p1.children + 123
            assert False
        except TypeError:
            assert True


class DefaultTest(_CollectionOperations):
    collection_class = None

    def test_sequence_ops(self):
        self._test_sequence_ops()


class ListTest(_CollectionOperations):
    collection_class = list

    def test_sequence_ops(self):
        self._test_sequence_ops()


class CustomDictTest(_CollectionOperations):
    collection_class = DictCollection

    def test_mapping_ops(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        p1 = Parent("P1")

        self.assert_(not p1._children)
        self.assert_(not p1.children)

        ch = Child("a", "regular")
        p1._children.append(ch)

        self.assert_(ch in list(p1._children.values()))
        self.assert_(len(p1._children) == 1)

        self.assert_(p1.children)
        self.assert_(len(p1.children) == 1)
        self.assert_(ch not in p1.children)
        self.assert_("a" in p1.children)
        self.assert_(p1.children["a"] == "regular")
        self.assert_(p1._children["a"] == ch)

        p1.children["b"] = "proxied"

        eq_(list(p1.children.keys()), ["a", "b"])
        eq_(list(p1.children.items()), [("a", "regular"), ("b", "proxied")])
        eq_(list(p1.children.values()), ["regular", "proxied"])

        self.assert_("proxied" in list(p1.children.values()))
        self.assert_("b" in p1.children)
        self.assert_("proxied" not in p1._children)
        self.assert_(len(p1.children) == 2)
        self.assert_(len(p1._children) == 2)

        self.assert_(p1._children["a"].name == "regular")
        self.assert_(p1._children["b"].name == "proxied")

        del p1._children["b"]

        self.assert_(len(p1._children) == 1)
        self.assert_(len(p1.children) == 1)
        self.assert_(p1._children["a"] == ch)

        del p1.children["a"]

        self.assert_(len(p1._children) == 0)
        self.assert_(len(p1.children) == 0)

        p1.children = {"d": "v d", "e": "v e", "f": "v f"}
        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        self.assert_(set(p1.children) == {"d", "e", "f"})

        del ch
        p1 = self.roundtrip(p1)
        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        p1.children["e"] = "changed-in-place"
        self.assert_(p1.children["e"] == "changed-in-place")
        inplace_id = p1._children["e"].id
        p1 = self.roundtrip(p1)
        self.assert_(p1.children["e"] == "changed-in-place")
        self.assert_(p1._children["e"].id == inplace_id)

        p1._children = {}
        self.assert_(len(p1.children) == 0)

        try:
            p1._children = []
            self.assert_(False)
        except TypeError:
            self.assert_(True)

        try:
            p1._children = None
            self.assert_(False)
        except TypeError:
            self.assert_(True)

        assert_raises(TypeError, set, [p1.children])

    def test_bulk_replace(self):
        Parent = self.classes.Parent

        p1 = Parent("foo")
        p1.children = {"a": "v a", "b": "v b", "c": "v c"}
        assocs = set(p1._children.values())
        keep_assocs = {a for a in assocs if a.foo in ("a", "c")}
        eq_(len(keep_assocs), 2)
        remove_assocs = {a for a in assocs if a.foo == "b"}

        p1.children = {"a": "v a", "d": "v d", "c": "v c"}
        eq_(
            {a for a in p1._children.values() if a.foo in ("a", "c")},
            keep_assocs,
        )
        assert not remove_assocs.intersection(p1._children.values())

        eq_(p1.children, {"a": "v a", "d": "v d", "c": "v c"})


class SetTest(_CollectionOperations):
    collection_class = set

    def test_set_operations(self):
        Parent, Child = self.classes.Parent, self.classes.Child

        self.session = fixture_session()

        p1 = Parent("P1")

        self.assert_(not p1._children)
        self.assert_(not p1.children)

        ch1 = Child("regular")
        p1._children.add(ch1)

        self.assert_(ch1 in p1._children)
        self.assert_(len(p1._children) == 1)

        self.assert_(p1.children)
        self.assert_(len(p1.children) == 1)
        self.assert_(ch1 not in p1.children)
        self.assert_("regular" in p1.children)

        p1.children.add("proxied")

        self.assert_("proxied" in p1.children)
        self.assert_("proxied" not in p1._children)
        self.assert_(len(p1.children) == 2)
        self.assert_(len(p1._children) == 2)

        self.assert_({o.name for o in p1._children} == {"regular", "proxied"})

        ch2 = None
        for o in p1._children:
            if o.name == "proxied":
                ch2 = o
                break

        p1._children.remove(ch2)

        self.assert_(len(p1._children) == 1)
        self.assert_(len(p1.children) == 1)
        self.assert_(p1._children == {ch1})

        p1.children.remove("regular")

        self.assert_(len(p1._children) == 0)
        self.assert_(len(p1.children) == 0)

        p1.children = ["a", "b", "c"]
        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        del ch1
        p1 = self.roundtrip(p1)

        self.assert_(len(p1._children) == 3)
        self.assert_(len(p1.children) == 3)

        self.assert_("a" in p1.children)
        self.assert_("b" in p1.children)
        self.assert_("d" not in p1.children)

        self.assert_(p1.children == {"a", "b", "c"})

        assert_raises(KeyError, p1.children.remove, "d")

        self.assert_(len(p1.children) == 3)
        p1.children.discard("d")
        self.assert_(len(p1.children) == 3)
        p1 = self.roundtrip(p1)
        self.assert_(len(p1.children) == 3)

        popped = p1.children.pop()
        self.assert_(len(p1.children) == 2)
        self.assert_(popped not in p1.children)
        p1 = self.roundtrip(p1)
        self.assert_(len(p1.children) == 2)
        self.assert_(popped not in p1.children)

        p1.children = ["a", "b", "c"]
        p1 = self.roundtrip(p1)
        self.assert_(p1.children == {"a", "b", "c"})

        p1.children.discard("b")
        p1 = self.roundtrip(p1)
        self.assert_(p1.children == {"a", "c"})

        p1.children.remove("a")
        p1 = self.roundtrip(p1)
        self.assert_(p1.children == {"c"})

        p1._children = set()
        self.assert_(len(p1.children) == 0)

        try:
            p1._children = []
            self.assert_(False)
        except TypeError:
            self.assert_(True)

        try:
            p1._children = None
            self.assert_(False)
        except TypeError:
            self.assert_(True)

        assert_raises(TypeError, set, [p1.children])

    def test_set_comparisons(self):
        Parent = self.classes.Parent

        p1 = Parent("P1")
        p1.children = ["a", "b", "c"]
        control = {"a", "b", "c"}

        for other in (
            {"a", "b", "c"},
            {"a", "b", "c", "d"},
            {"a"},
            {"a", "b"},
            {"c", "d"},
            {"e", "f", "g"},
            set(),
        ):
            eq_(p1.children.union(other), control.union(other))
            eq_(p1.children.difference(other), control.difference(other))
            eq_((p1.children - other), (control - other))
            eq_(p1.children.intersection(other), control.intersection(other))
            eq_(
                p1.children.symmetric_difference(other),
                control.symmetric_difference(other),
            )
            eq_(p1.children.issubset(other), control.issubset(other))
            eq_(p1.children.issuperset(other), control.issuperset(other))

            self.assert_((p1.children == other) == (control == other))
            self.assert_((p1.children != other) == (control != other))
            self.assert_((p1.children < other) == (control < other))
            self.assert_((p1.children <= other) == (control <= other))
            self.assert_((p1.children > other) == (control > other))
            self.assert_((p1.children >= other) == (control >= other))

    def test_set_comparison_empty_to_empty(self):
        # test issue #3265 which was fixed in Python version 2.7.8
        Parent = self.classes.Parent

        p1 = Parent("P1")
        p1.children = []

        p2 = Parent("P2")
        p2.children = []

        set_0 = set()
        set_a = p1.children
        set_b = p2.children

        is_(set_a == set_a, True)
        is_(set_a == set_b, True)
        is_(set_a == set_0, True)
        is_(set_0 == set_a, True)

        is_(set_a != set_a, False)
        is_(set_a != set_b, False)
        is_(set_a != set_0, False)
        is_(set_0 != set_a, False)

    def test_set_mutation(self):
        Parent = self.classes.Parent

        self.session = fixture_session()

        # mutations
        for op in (
            "update",
            "intersection_update",
            "difference_update",
            "symmetric_difference_update",
        ):
            for base in (["a", "b", "c"], []):
                for other in (
                    {"a", "b", "c"},
                    {"a", "b", "c", "d"},
                    {"a"},
                    {"a", "b"},
                    {"c", "d"},
                    {"e", "f", "g"},
                    set(),
                ):
                    p = Parent("p")
                    p.children = base[:]
                    control = set(base[:])

                    getattr(p.children, op)(other)
                    getattr(control, op)(other)
                    try:
                        self.assert_(p.children == control)
                    except Exception:
                        print("Test %s.%s(%s):" % (set(base), op, other))
                        print("want", repr(control))
                        print("got", repr(p.children))
                        raise

                    p = self.roundtrip(p)

                    try:
                        self.assert_(p.children == control)
                    except Exception:
                        print("Test %s.%s(%s):" % (base, op, other))
                        print("want", repr(control))
                        print("got", repr(p.children))
                        raise

        # in-place mutations
        for op in ("|=", "-=", "&=", "^="):
            for base in (["a", "b", "c"], []):
                for other in (
                    {"a", "b", "c"},
                    {"a", "b", "c", "d"},
                    {"a"},
                    {"a", "b"},
                    {"c", "d"},
                    {"e", "f", "g"},
                    frozenset(["e", "f", "g"]),
                    set(),
                ):
                    p = Parent("p")
                    p.children = base[:]
                    control = set(base[:])

                    exec("p.children %s other" % op)
                    exec("control %s other" % op)

                    try:
                        self.assert_(p.children == control)
                    except Exception:
                        print("Test %s %s %s:" % (set(base), op, other))
                        print("want", repr(control))
                        print("got", repr(p.children))
                        raise

                    p = self.roundtrip(p)

                    try:
                        self.assert_(p.children == control)
                    except Exception:
                        print("Test %s %s %s:" % (base, op, other))
                        print("want", repr(control))
                        print("got", repr(p.children))
                        raise

    def test_bulk_replace(self):
        Parent = self.classes.Parent

        p1 = Parent("foo")
        p1.children = {"a", "b", "c"}
        assocs = set(p1._children)
        keep_assocs = {a for a in assocs if a.name in ("a", "c")}
        eq_(len(keep_assocs), 2)
        remove_assocs = {a for a in assocs if a.name == "b"}

        p1.children = {"a", "c", "d"}
        eq_({a for a in p1._children if a.name in ("a", "c")}, keep_assocs)
        assert not remove_assocs.intersection(p1._children)

        eq_(p1.children, {"a", "c", "d"})


class CustomSetTest(SetTest):
    collection_class = SetCollection


class CustomObjectTest(_CollectionOperations):
    collection_class = ObjectCollection

    def test_basic(self):
        Parent = self.classes.Parent

        self.session = fixture_session()

        p = Parent("p1")
        self.assert_(len(list(p.children)) == 0)

        p.children.append("child")
        self.assert_(len(list(p.children)) == 1)

        p = self.roundtrip(p)
        self.assert_(len(list(p.children)) == 1)

        # We didn't provide an alternate _AssociationList implementation
        # for our ObjectCollection, so indexing will fail.
        assert_raises(TypeError, p.children.__getitem__, 1)


class ProxyFactoryTest(ListTest):
    @classmethod
    def define_tables(cls, metadata):
        Table(
            "Parent",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(128)),
        )
        Table(
            "Children",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("parent_id", Integer, ForeignKey("Parent.id")),
            Column("foo", String(128)),
            Column("name", String(128)),
        )

    @classmethod
    def setup_mappers(cls):
        parents_table, children_table = cls.tables("Parent", "Children")

        class CustomProxy(_AssociationList):
            def __init__(self, lazy_collection, creator, value_attr, parent):
                getter, setter = parent._default_getset(lazy_collection)
                _AssociationList.__init__(
                    self, lazy_collection, creator, getter, setter, parent
                )

        class Parent(cls.Basic):
            children = association_proxy(
                "_children",
                "name",
                proxy_factory=CustomProxy,
                proxy_bulk_set=CustomProxy.extend,
            )

            def __init__(self, name):
                self.name = name

        class Child(cls.Basic):
            def __init__(self, name):
                self.name = name

        cls.mapper_registry.map_imperatively(
            Parent,
            parents_table,
            properties={
                "_children": relationship(
                    Child, lazy="joined", collection_class=list
                )
            },
        )
        cls.mapper_registry.map_imperatively(Child, children_table)

    def test_sequence_ops(self):
        self._test_sequence_ops()


class ScalarTest(fixtures.MappedTest):
    @testing.provide_metadata
    def test_scalar_proxy(self):
        metadata = self.metadata

        parents_table = Table(
            "Parent",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(128)),
        )
        children_table = Table(
            "Children",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("parent_id", Integer, ForeignKey("Parent.id")),
            Column("foo", String(128)),
            Column("bar", String(128)),
            Column("baz", String(128)),
        )

        class Parent:
            foo = association_proxy("child", "foo")
            bar = association_proxy(
                "child", "bar", creator=lambda v: Child(bar=v)
            )
            baz = association_proxy(
                "child", "baz", creator=lambda v: Child(baz=v)
            )

            def __init__(self, name):
                self.name = name

        class Child:
            def __init__(self, **kw):
                for attr in kw:
                    setattr(self, attr, kw[attr])

        self.mapper_registry.map_imperatively(
            Parent,
            parents_table,
            properties={
                "child": relationship(
                    Child, lazy="joined", backref="parent", uselist=False
                )
            },
        )
        self.mapper_registry.map_imperatively(Child, children_table)

        metadata.create_all(testing.db)
        session = fixture_session()

        def roundtrip(obj):
            if obj not in session:
                session.add(obj)
            session.flush()
            id_, type_ = obj.id, type(obj)
            session.expunge_all()
            return session.get(type_, id_)

        p = Parent("p")

        eq_(p.child, None)
        eq_(p.foo, None)

        p.child = Child(foo="a", bar="b", baz="c")

        self.assert_(p.foo == "a")
        self.assert_(p.bar == "b")
        self.assert_(p.baz == "c")

        p.bar = "x"
        self.assert_(p.foo == "a")
        self.assert_(p.bar == "x")
        self.assert_(p.baz == "c")

        p = roundtrip(p)

        self.assert_(p.foo == "a")
        self.assert_(p.bar == "x")
        self.assert_(p.baz == "c")

        p.child = None

        eq_(p.foo, None)

        # Bogus creator for this scalar type
        assert_raises(TypeError, setattr, p, "foo", "zzz")

        p.bar = "yyy"

        self.assert_(p.foo is None)
        self.assert_(p.bar == "yyy")
        self.assert_(p.baz is None)

        del p.child

        p = roundtrip(p)

        self.assert_(p.child is None)

        p.baz = "xxx"

        self.assert_(p.foo is None)
        self.assert_(p.bar is None)
        self.assert_(p.baz == "xxx")

        p = roundtrip(p)

        self.assert_(p.foo is None)
        self.assert_(p.bar is None)
        self.assert_(p.baz == "xxx")

        # Ensure an immediate __set__ works.
        p2 = Parent("p2")
        p2.bar = "quux"

    def test_scalar_opts_exclusive(self):
        with expect_raises_message(
            exc.ArgumentError,
            "The cascade_scalar_deletes and create_on_none_assignment "
            "parameters are mutually exclusive.",
        ):
            association_proxy(
                "a",
                "b",
                cascade_scalar_deletes=True,
                create_on_none_assignment=True,
            )

    @testing.variation("create_on_none", [True, False])
    @testing.variation("specify_creator", [True, False])
    def test_create_on_set_none(
        self, create_on_none, specify_creator, decl_base
    ):
        class A(decl_base):
            __tablename__ = "a"
            id = mapped_column(Integer, primary_key=True)
            b_id = mapped_column(ForeignKey("b.id"))
            b = relationship("B")

            if specify_creator:
                b_data = association_proxy(
                    "b",
                    "data",
                    create_on_none_assignment=bool(create_on_none),
                    creator=lambda data: B(data=data),
                )
            else:
                b_data = association_proxy(
                    "b", "data", create_on_none_assignment=bool(create_on_none)
                )

        class B(decl_base):
            __tablename__ = "b"
            id = mapped_column(Integer, primary_key=True)
            data = mapped_column(String)

            def __init__(self, data=None):
                self.data = data

        a1 = A()
        is_none(a1.b)
        a1.b_data = None

        if create_on_none:
            is_not_none(a1.b)
        else:
            is_none(a1.b)

        a1.b_data = "data"

        a1.b_data = None
        is_not_none(a1.b)

    @testing.provide_metadata
    def test_empty_scalars(self):
        metadata = self.metadata

        a = Table(
            "a",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("name", String(50)),
        )
        a2b = Table(
            "a2b",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("id_a", Integer, ForeignKey("a.id")),
            Column("id_b", Integer, ForeignKey("b.id")),
            Column("name", String(50)),
        )
        b = Table(
            "b",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("name", String(50)),
        )

        class A:
            a2b_name = association_proxy("a2b_single", "name")
            b_single = association_proxy("a2b_single", "b")

        class A2B:
            pass

        class B:
            pass

        self.mapper_registry.map_imperatively(
            A, a, properties=dict(a2b_single=relationship(A2B, uselist=False))
        )

        self.mapper_registry.map_imperatively(
            A2B, a2b, properties=dict(b=relationship(B))
        )
        self.mapper_registry.map_imperatively(B, b)

        a1 = A()
        assert a1.a2b_name is None
        assert a1.b_single is None

    def test_custom_getset(self):
        metadata = MetaData()
        p = Table(
            "p",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("cid", Integer, ForeignKey("c.id")),
        )
        c = Table(
            "c",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("foo", String(128)),
        )

        get = Mock()
        set_ = Mock()

        class Parent:
            foo = association_proxy(
                "child", "foo", getset_factory=lambda cc, parent: (get, set_)
            )

        class Child:
            def __init__(self, foo):
                self.foo = foo

        self.mapper_registry.map_imperatively(
            Parent, p, properties={"child": relationship(Child)}
        )
        self.mapper_registry.map_imperatively(Child, c)

        p1 = Parent()

        eq_(p1.foo, get(None))
        p1.child = child = Child(foo="x")
        eq_(p1.foo, get(child))
        p1.foo = "y"
        eq_(set_.mock_calls, [call(child, "y")])


class LazyLoadTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table(
            "Parent",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(128)),
        )
        Table(
            "Children",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("parent_id", Integer, ForeignKey("Parent.id")),
            Column("foo", String(128)),
            Column("name", String(128)),
        )

    @classmethod
    def setup_mappers(cls):
        class Parent(cls.Basic):
            children = association_proxy("_children", "name")

            def __init__(self, name):
                self.name = name

        class Child(cls.Basic):
            def __init__(self, name):
                self.name = name

        cls.mapper_registry.map_imperatively(Child, cls.tables.Children)

    def roundtrip(self, obj):
        self.session.add(obj)
        self.session.flush()
        id_, type_ = obj.id, type(obj)
        self.session.expunge_all()
        return self.session.get(type_, id_)

    def test_lazy_list(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.Parent,
            properties={
                "_children": relationship(
                    Child, lazy="select", collection_class=list
                )
            },
        )

        p = Parent("p")
        p.children = ["a", "b", "c"]

        p = self.roundtrip(p)

        # Is there a better way to ensure that the association_proxy
        # didn't convert a lazy load to an eager load?  This does work though.
        self.assert_("_children" not in p.__dict__)
        self.assert_(len(p._children) == 3)
        self.assert_("_children" in p.__dict__)

    def test_eager_list(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.Parent,
            properties={
                "_children": relationship(
                    Child, lazy="joined", collection_class=list
                )
            },
        )

        p = Parent("p")
        p.children = ["a", "b", "c"]

        p = self.roundtrip(p)

        self.assert_("_children" in p.__dict__)
        self.assert_(len(p._children) == 3)

    def test_slicing_list(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.Parent,
            properties={
                "_children": relationship(
                    Child, lazy="select", collection_class=list
                )
            },
        )

        p = Parent("p")
        p.children = ["a", "b", "c"]

        p = self.roundtrip(p)

        self.assert_(len(p._children) == 3)
        eq_("b", p.children[1])
        eq_(["b", "c"], p.children[-2:])

    def test_lazy_scalar(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.Parent,
            properties={
                "_children": relationship(Child, lazy="select", uselist=False)
            },
        )

        p = Parent("p")
        p.children = "value"

        p = self.roundtrip(p)

        self.assert_("_children" not in p.__dict__)
        self.assert_(p._children is not None)

    def test_eager_scalar(self):
        Parent, Child = self.classes("Parent", "Child")

        self.session = fixture_session()

        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.Parent,
            properties={
                "_children": relationship(Child, lazy="joined", uselist=False)
            },
        )

        p = Parent("p")
        p.children = "value"

        p = self.roundtrip(p)

        self.assert_("_children" in p.__dict__)
        self.assert_(p._children is not None)


class Parent:
    def __init__(self, name):
        self.name = name


class Child:
    def __init__(self, name):
        self.name = name


class KVChild:
    def __init__(self, name, value):
        self.name = name
        self.value = value


class ReconstitutionTest(fixtures.MappedTest):
    run_setup_mappers = "each"
    run_setup_classes = "each"

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "parents",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(30)),
        )
        Table(
            "children",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("parent_id", Integer, ForeignKey("parents.id")),
            Column("name", String(30)),
        )

    @classmethod
    def insert_data(cls, connection):
        parents = cls.tables.parents
        connection.execute(parents.insert(), dict(name="p1"))

    @classmethod
    def setup_classes(cls):
        Parent.kids = association_proxy("children", "name")

    def test_weak_identity_map(self):
        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parents,
            properties=dict(children=relationship(Child)),
        )
        self.mapper_registry.map_imperatively(Child, self.tables.children)
        session = fixture_session()

        def add_child(parent_name, child_name):
            parent = session.query(Parent).filter_by(name=parent_name).one()
            parent.kids.append(child_name)

        add_child("p1", "c1")
        gc_collect()
        add_child("p1", "c2")
        session.flush()
        p = session.query(Parent).filter_by(name="p1").one()
        assert set(p.kids) == {"c1", "c2"}, p.kids

    def test_copy(self):
        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parents,
            properties=dict(children=relationship(Child)),
        )
        self.mapper_registry.map_imperatively(Child, self.tables.children)
        p = Parent("p1")
        p.kids.extend(["c1", "c2"])
        p_copy = copy.copy(p)
        del p
        gc_collect()
        assert set(p_copy.kids) == {"c1", "c2"}, p_copy.kids

    def test_pickle_list(self):
        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parents,
            properties=dict(children=relationship(Child)),
        )
        self.mapper_registry.map_imperatively(Child, self.tables.children)
        p = Parent("p1")
        p.kids.extend(["c1", "c2"])
        r1 = pickle.loads(pickle.dumps(p))
        assert r1.kids == ["c1", "c2"]

        # can't do this without parent having a cycle
        # r2 = pickle.loads(pickle.dumps(p.kids))
        # assert r2 == ['c1', 'c2']

    def test_pickle_set(self):
        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parents,
            properties=dict(
                children=relationship(Child, collection_class=set)
            ),
        )
        self.mapper_registry.map_imperatively(Child, self.tables.children)
        p = Parent("p1")
        p.kids.update(["c1", "c2"])
        r1 = pickle.loads(pickle.dumps(p))
        assert r1.kids == {"c1", "c2"}

        # can't do this without parent having a cycle
        # r2 = pickle.loads(pickle.dumps(p.kids))
        # assert r2 == set(['c1', 'c2'])

    def test_pickle_dict(self):
        self.mapper_registry.map_imperatively(
            Parent,
            self.tables.parents,
            properties=dict(
                children=relationship(
                    KVChild,
                    collection_class=collections.keyfunc_mapping(
                        PickleKeyFunc("name")
                    ),
                )
            ),
        )
        self.mapper_registry.map_imperatively(KVChild, self.tables.children)
        p = Parent("p1")
        p.kids.update({"c1": "v1", "c2": "v2"})
        assert p.kids == {"c1": "c1", "c2": "c2"}
        r1 = pickle.loads(pickle.dumps(p))
        assert r1.kids == {"c1": "c1", "c2": "c2"}

        # can't do this without parent having a cycle
        # r2 = pickle.loads(pickle.dumps(p.kids))
        # assert r2 == {'c1': 'c1', 'c2': 'c2'}


class PickleKeyFunc:
    def __init__(self, name):
        self.name = name

    def __call__(self, obj):
        return getattr(obj, self.name)


class ComparatorTest(fixtures.MappedTest, AssertsCompiledSQL):
    __dialect__ = "default"

    run_inserts = "once"
    run_deletes = None
    run_setup_mappers = "once"
    run_setup_classes = "once"

    @classmethod
    def define_tables(cls, metadata):
        Table(
            "userkeywords",
            metadata,
            Column(
                "keyword_id",
                Integer,
                ForeignKey("keywords.id"),
                primary_key=True,
            ),
            Column("user_id", Integer, ForeignKey("users.id")),
            Column("value", String(50)),
        )
        Table(
            "users",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("name", String(64)),
            Column("singular_id", Integer, ForeignKey("singular.id")),
        )
        Table(
            "keywords",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("keyword", String(64)),
            Column("singular_id", Integer, ForeignKey("singular.id")),
        )
        Table(
            "singular",
            metadata,
            Column(
                "id", Integer, primary_key=True, test_needs_autoincrement=True
            ),
            Column("value", String(50)),
        )

    @classmethod
    def setup_classes(cls):
        class User(cls.Comparable):
            def __init__(self, name):
                self.name = name

            # o2m -> m2o
            # uselist -> nonuselist
            keywords = association_proxy(
                "user_keywords",
                "keyword",
                creator=lambda k: UserKeyword(keyword=k),
            )

            # m2o -> o2m
            # nonuselist -> uselist
            singular_keywords = association_proxy("singular", "keywords")

            # m2o -> scalar
            # nonuselist
            singular_value = association_proxy("singular", "value")

            # o2m -> scalar
            singular_collection = association_proxy("user_keywords", "value")

            # uselist assoc_proxy -> assoc_proxy -> obj
            common_users = association_proxy("user_keywords", "common_users")

            # non uselist assoc_proxy -> assoc_proxy -> obj
            common_singular = association_proxy("singular", "keyword")

            # non uselist assoc_proxy -> assoc_proxy -> scalar
            singular_keyword = association_proxy("singular", "keyword")

            # uselist assoc_proxy -> assoc_proxy -> scalar
            common_keyword_name = association_proxy(
                "user_keywords", "keyword_name"
            )

        class Keyword(cls.Comparable):
            def __init__(self, keyword):
                self.keyword = keyword

            # o2o -> m2o
            # nonuselist -> nonuselist
            user = association_proxy("user_keyword", "user")

            # uselist assoc_proxy -> collection -> assoc_proxy -> scalar object
            # (o2m relationship,
            #  associationproxy(m2o relationship, m2o relationship))
            singulars = association_proxy("user_keywords", "singular")

        class UserKeyword(cls.Comparable):
            def __init__(self, user=None, keyword=None):
                self.user = user
                self.keyword = keyword

            common_users = association_proxy("keyword", "user")
            keyword_name = association_proxy("keyword", "keyword")

            singular = association_proxy("user", "singular")

        class Singular(cls.Comparable):
            def __init__(self, value=None):
                self.value = value

            keyword = association_proxy("keywords", "keyword")

    @classmethod
    def setup_mappers(cls):
        (
            users,
            Keyword,
            UserKeyword,
            singular,
            userkeywords,
            User,
            keywords,
            Singular,
        ) = (
            cls.tables.users,
            cls.classes.Keyword,
            cls.classes.UserKeyword,
            cls.tables.singular,
            cls.tables.userkeywords,
            cls.classes.User,
            cls.tables.keywords,
            cls.classes.Singular,
        )

        cls.mapper_registry.map_imperatively(
            User, users, properties={"singular": relationship(Singular)}
        )
        cls.mapper_registry.map_imperatively(
            Keyword,
            keywords,
            properties={
                "user_keyword": relationship(
                    UserKeyword, uselist=False, back_populates="keyword"
                ),
                "user_keywords": relationship(UserKeyword, viewonly=True),
            },
        )

        cls.mapper_registry.map_imperatively(
            UserKeyword,
            userkeywords,
            properties={
                "user": relationship(User, backref="user_keywords"),
                "keyword": relationship(
                    Keyword, back_populates="user_keyword"
                ),
            },
        )
        cls.mapper_registry.map_imperatively(
            Singular, singular, properties={"keywords": relationship(Keyword)}
        )

    @classmethod
    def insert_data(cls, connection):
        UserKeyword, User, Keyword, Singular = (
            cls.classes.UserKeyword,
            cls.classes.User,
            cls.classes.Keyword,
            cls.classes.Singular,
        )

        session = Session(connection)
        words = ("quick", "brown", "fox", "jumped", "over", "the", "lazy")
        for ii in range(16):
            user = User("user%d" % ii)

            if ii % 2 == 0:
                user.singular = Singular(
                    value=("singular%d" % ii) if ii % 4 == 0 else None
                )
            session.add(user)
            for jj in words[(ii % len(words)) : ((ii + 3) % len(words))]:
                k = Keyword(jj)
                user.keywords.append(k)
                if ii % 2 == 0:
                    user.singular.keywords.append(k)
                    user.user_keywords[-1].value = "singular%d" % ii

        orphan = Keyword("orphan")
        orphan.user_keyword = UserKeyword(keyword=orphan, user=None)
        session.add(orphan)

        keyword_with_nothing = Keyword("kwnothing")
        session.add(keyword_with_nothing)

        session.commit()
        cls.u = user
        cls.kw = user.keywords[0]

        # TODO: this is not the correct pattern, use session per test
        cls.session = Session(testing.db)

    def _equivalent(self, q_proxy, q_direct):
        proxy_sql = q_proxy.statement.compile(dialect=default.DefaultDialect())
        direct_sql = q_direct.statement.compile(
            dialect=default.DefaultDialect()
        )
        eq_(str(proxy_sql), str(direct_sql))
        eq_(q_proxy.all(), q_direct.all())

    def test_no_straight_expr(self):
        User = self.classes.User

        assert_raises_message(
            NotImplementedError,
            "The association proxy can't be used as a plain column expression",
            func.foo,
            User.singular_value,
        )

        assert_raises_message(
            NotImplementedError,
            "The association proxy can't be used as a plain column expression",
            self.session.query,
            User.singular_value,
        )

    def test_filter_any_criterion_ul_scalar(self):
        UserKeyword, User = self.classes.UserKeyword, self.classes.User

        q1 = self.session.query(User).filter(
            User.singular_collection.any(UserKeyword.value == "singular8")
        )
        self.assert_compile(
            q1,
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users "
            "WHERE EXISTS (SELECT 1 "
            "FROM userkeywords "
            "WHERE users.id = userkeywords.user_id AND "
            "userkeywords.value = :value_1)",
            checkparams={"value_1": "singular8"},
        )

        q2 = self.session.query(User).filter(
            User.user_keywords.any(UserKeyword.value == "singular8")
        )
        self._equivalent(q1, q2)

    def test_filter_any_kwarg_ul_nul(self):
        UserKeyword, User = self.classes.UserKeyword, self.classes.User

        self._equivalent(
            self.session.query(User).filter(
                User.keywords.any(keyword="jumped")
            ),
            self.session.query(User).filter(
                User.user_keywords.any(
                    UserKeyword.keyword.has(keyword="jumped")
                )
            ),
        )

    def test_filter_has_kwarg_nul_nul(self):
        UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword

        self._equivalent(
            self.session.query(Keyword).filter(Keyword.user.has(name="user2")),
            self.session.query(Keyword).filter(
                Keyword.user_keyword.has(UserKeyword.user.has(name="user2"))
            ),
        )

    def test_filter_has_kwarg_nul_ul(self):
        User, Singular = self.classes.User, self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_keywords.any(keyword="jumped")
            ),
            self.session.query(User).filter(
                User.singular.has(Singular.keywords.any(keyword="jumped"))
            ),
        )

    def test_filter_any_criterion_ul_nul(self):
        UserKeyword, User, Keyword = (
            self.classes.UserKeyword,
            self.classes.User,
            self.classes.Keyword,
        )

        self._equivalent(
            self.session.query(User).filter(
                User.keywords.any(Keyword.keyword == "jumped")
            ),
            self.session.query(User).filter(
                User.user_keywords.any(
                    UserKeyword.keyword.has(Keyword.keyword == "jumped")
                )
            ),
        )

    def test_filter_has_criterion_nul_nul(self):
        UserKeyword, User, Keyword = (
            self.classes.UserKeyword,
            self.classes.User,
            self.classes.Keyword,
        )

        self._equivalent(
            self.session.query(Keyword).filter(
                Keyword.user.has(User.name == "user2")
            ),
            self.session.query(Keyword).filter(
                Keyword.user_keyword.has(
                    UserKeyword.user.has(User.name == "user2")
                )
            ),
        )

    def test_filter_any_criterion_nul_ul(self):
        User, Keyword, Singular = (
            self.classes.User,
            self.classes.Keyword,
            self.classes.Singular,
        )

        self._equivalent(
            self.session.query(User).filter(
                User.singular_keywords.any(Keyword.keyword == "jumped")
            ),
            self.session.query(User).filter(
                User.singular.has(
                    Singular.keywords.any(Keyword.keyword == "jumped")
                )
            ),
        )

    def test_filter_contains_ul_nul(self):
        User = self.classes.User

        self._equivalent(
            self.session.query(User).filter(User.keywords.contains(self.kw)),
            self.session.query(User).filter(
                User.user_keywords.any(keyword=self.kw)
            ),
        )

    def test_filter_contains_nul_ul(self):
        User, Singular = self.classes.User, self.classes.Singular

        with expect_warnings(
            "Got None for value of column keywords.singular_id;"
        ):
            self._equivalent(
                self.session.query(User).filter(
                    User.singular_keywords.contains(self.kw)
                ),
                self.session.query(User).filter(
                    User.singular.has(Singular.keywords.contains(self.kw))
                ),
            )

    def test_filter_eq_nul_nul(self):
        Keyword = self.classes.Keyword

        self._equivalent(
            self.session.query(Keyword).filter(Keyword.user == self.u),
            self.session.query(Keyword).filter(
                Keyword.user_keyword.has(user=self.u)
            ),
        )

    def test_filter_ne_nul_nul(self):
        Keyword = self.classes.Keyword
        UserKeyword = self.classes.UserKeyword

        self._equivalent(
            self.session.query(Keyword).filter(Keyword.user != self.u),
            self.session.query(Keyword).filter(
                Keyword.user_keyword.has(UserKeyword.user != self.u)
            ),
        )

    def test_filter_eq_null_nul_nul(self):
        UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword

        self._equivalent(
            self.session.query(Keyword).filter(Keyword.user == None),  # noqa
            self.session.query(Keyword).filter(
                or_(
                    Keyword.user_keyword.has(UserKeyword.user == None),
                    Keyword.user_keyword == None,
                )
            ),
        )

    def test_filter_ne_null_nul_nul(self):
        UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword

        self._equivalent(
            self.session.query(Keyword).filter(Keyword.user != None),  # noqa
            self.session.query(Keyword).filter(
                Keyword.user_keyword.has(UserKeyword.user != None)
            ),
        )

    def test_filter_object_eq_None_nul(self):
        UserKeyword = self.classes.UserKeyword
        User = self.classes.User

        self._equivalent(
            self.session.query(UserKeyword).filter(
                UserKeyword.singular == None
            ),  # noqa
            self.session.query(UserKeyword).filter(
                or_(
                    UserKeyword.user.has(User.singular == None),
                    UserKeyword.user_id == None,
                )
            ),
        )

    def test_filter_column_eq_None_nul(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_value == None
            ),  # noqa
            self.session.query(User).filter(
                or_(
                    User.singular.has(Singular.value == None),
                    User.singular == None,
                )
            ),
        )

    def test_filter_object_ne_value_nul(self):
        UserKeyword = self.classes.UserKeyword
        User = self.classes.User
        Singular = self.classes.Singular

        s4 = self.session.query(Singular).filter_by(value="singular4").one()
        self._equivalent(
            self.session.query(UserKeyword).filter(UserKeyword.singular != s4),
            self.session.query(UserKeyword).filter(
                UserKeyword.user.has(User.singular != s4)
            ),
        )

    def test_filter_column_ne_value_nul(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_value != "singular4"
            ),
            self.session.query(User).filter(
                User.singular.has(Singular.value != "singular4")
            ),
        )

    def test_filter_eq_value_nul(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_value == "singular4"
            ),
            self.session.query(User).filter(
                User.singular.has(Singular.value == "singular4")
            ),
        )

    def test_filter_ne_None_nul(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_value != None
            ),  # noqa
            self.session.query(User).filter(
                User.singular.has(Singular.value != None)
            ),
        )

    def test_has_nul(self):
        # a special case where we provide an empty has() on a
        # non-object-targeted association proxy.
        User = self.classes.User
        self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(User.singular_value.has()),
            self.session.query(User).filter(User.singular.has()),
        )

    def test_nothas_nul(self):
        # a special case where we provide an empty has() on a
        # non-object-targeted association proxy.
        User = self.classes.User
        self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(~User.singular_value.has()),
            self.session.query(User).filter(~User.singular.has()),
        )

    def test_filter_any_chained(self):
        User = self.classes.User

        UserKeyword, User = self.classes.UserKeyword, self.classes.User
        Keyword = self.classes.Keyword

        q1 = self.session.query(User).filter(
            User.common_users.any(User.name == "user7")
        )
        self.assert_compile(
            q1,
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users "
            "WHERE EXISTS (SELECT 1 "
            "FROM userkeywords "
            "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
            "FROM keywords "
            "WHERE keywords.id = userkeywords.keyword_id AND "
            "(EXISTS (SELECT 1 "
            "FROM userkeywords "
            "WHERE keywords.id = userkeywords.keyword_id AND "
            "(EXISTS (SELECT 1 "
            "FROM users "
            "WHERE users.id = userkeywords.user_id AND users.name = :name_1)"
            "))))))",
            checkparams={"name_1": "user7"},
        )

        q2 = self.session.query(User).filter(
            User.user_keywords.any(
                UserKeyword.keyword.has(
                    Keyword.user_keyword.has(
                        UserKeyword.user.has(User.name == "user7")
                    )
                )
            )
        )
        self._equivalent(q1, q2)

    def test_filter_has_chained_has_to_any(self):
        User = self.classes.User
        Singular = self.classes.Singular
        Keyword = self.classes.Keyword

        q1 = self.session.query(User).filter(
            User.common_singular.has(Keyword.keyword == "brown")
        )
        self.assert_compile(
            q1,
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users "
            "WHERE EXISTS (SELECT 1 "
            "FROM singular "
            "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
            "FROM keywords "
            "WHERE singular.id = keywords.singular_id AND "
            "keywords.keyword = :keyword_1)))",
            checkparams={"keyword_1": "brown"},
        )

        q2 = self.session.query(User).filter(
            User.singular.has(
                Singular.keywords.any(Keyword.keyword == "brown")
            )
        )
        self._equivalent(q1, q2)

    def test_filter_has_scalar_raises(self):
        User = self.classes.User
        assert_raises_message(
            exc.ArgumentError,
            r"Can't apply keyword arguments to column-targeted",
            User.singular_keyword.has,
            keyword="brown",
        )

    def test_filter_eq_chained_has_to_any(self):
        User = self.classes.User
        Keyword = self.classes.Keyword
        Singular = self.classes.Singular

        q1 = self.session.query(User).filter(User.singular_keyword == "brown")
        self.assert_compile(
            q1,
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users "
            "WHERE EXISTS (SELECT 1 "
            "FROM singular "
            "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 "
            "FROM keywords "
            "WHERE singular.id = keywords.singular_id "
            "AND keywords.keyword = :keyword_1)))",
            checkparams={"keyword_1": "brown"},
        )
        q2 = self.session.query(User).filter(
            User.singular.has(
                Singular.keywords.any(Keyword.keyword == "brown")
            )
        )

        self._equivalent(q1, q2)

    def test_filter_contains_chained_any_to_has(self):
        User = self.classes.User
        Keyword = self.classes.Keyword
        UserKeyword = self.classes.UserKeyword

        q1 = self.session.query(User).filter(
            User.common_keyword_name.contains("brown")
        )
        self.assert_compile(
            q1,
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users "
            "WHERE EXISTS (SELECT 1 "
            "FROM userkeywords "
            "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 "
            "FROM keywords "
            "WHERE keywords.id = userkeywords.keyword_id AND "
            "keywords.keyword = :keyword_1)))",
            checkparams={"keyword_1": "brown"},
        )

        q2 = self.session.query(User).filter(
            User.user_keywords.any(
                UserKeyword.keyword.has(Keyword.keyword == "brown")
            )
        )
        self._equivalent(q1, q2)

    def test_filter_contains_chained_any_to_has_to_eq(self):
        User = self.classes.User
        Keyword = self.classes.Keyword
        UserKeyword = self.classes.UserKeyword
        Singular = self.classes.Singular

        singular = self.session.query(Singular).order_by(Singular.id).first()

        q1 = self.session.query(Keyword).filter(
            Keyword.singulars.contains(singular)
        )
        self.assert_compile(
            q1,
            "SELECT keywords.id AS keywords_id, "
            "keywords.keyword AS keywords_keyword, "
            "keywords.singular_id AS keywords_singular_id "
            "FROM keywords "
            "WHERE EXISTS (SELECT 1 "
            "FROM userkeywords "
            "WHERE keywords.id = userkeywords.keyword_id AND "
            "(EXISTS (SELECT 1 "
            "FROM users "
            "WHERE users.id = userkeywords.user_id AND "
            ":param_1 = users.singular_id)))",
            checkparams={"param_1": singular.id},
        )

        q2 = self.session.query(Keyword).filter(
            Keyword.user_keywords.any(
                UserKeyword.user.has(User.singular == singular)
            )
        )
        self._equivalent(q1, q2)

    def test_has_criterion_nul(self):
        # but we don't allow that with any criterion...
        User = self.classes.User
        self.classes.Singular

        assert_raises_message(
            exc.ArgumentError,
            r"Non-empty has\(\) not allowed",
            User.singular_value.has,
            User.singular_value == "singular4",
        )

    def test_has_kwargs_nul(self):
        # ... or kwargs
        User = self.classes.User
        self.classes.Singular

        assert_raises_message(
            exc.ArgumentError,
            r"Can't apply keyword arguments to column-targeted",
            User.singular_value.has,
            singular_value="singular4",
        )

    def test_filter_scalar_object_contains_fails_nul_nul(self):
        Keyword = self.classes.Keyword

        assert_raises(
            exc.InvalidRequestError, lambda: Keyword.user.contains(self.u)
        )

    def test_filter_scalar_object_any_fails_nul_nul(self):
        Keyword = self.classes.Keyword

        assert_raises(
            exc.InvalidRequestError, lambda: Keyword.user.any(name="user2")
        )

    def test_filter_scalar_column_like(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(User.singular_value.like("foo")),
            self.session.query(User).filter(
                User.singular.has(Singular.value.like("foo"))
            ),
        )

    def test_filter_scalar_column_contains(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(
                User.singular_value.contains("foo")
            ),
            self.session.query(User).filter(
                User.singular.has(Singular.value.contains("foo"))
            ),
        )

    def test_filter_scalar_column_eq(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(User.singular_value == "foo"),
            self.session.query(User).filter(
                User.singular.has(Singular.value == "foo")
            ),
        )

    def test_filter_scalar_column_ne(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(User.singular_value != "foo"),
            self.session.query(User).filter(
                User.singular.has(Singular.value != "foo")
            ),
        )

    def test_filter_scalar_column_eq_nul(self):
        User = self.classes.User
        Singular = self.classes.Singular

        self._equivalent(
            self.session.query(User).filter(User.singular_value == None),
            self.session.query(User).filter(
                or_(
                    User.singular.has(Singular.value == None),
                    User.singular == None,
                )
            ),
        )

    def test_filter_collection_has_fails_ul_nul(self):
        User = self.classes.User

        assert_raises(
            exc.InvalidRequestError, lambda: User.keywords.has(keyword="quick")
        )

    def test_filter_collection_eq_fails_ul_nul(self):
        User = self.classes.User

        assert_raises(
            exc.InvalidRequestError, lambda: User.keywords == self.kw
        )

    def test_filter_collection_ne_fails_ul_nul(self):
        User = self.classes.User

        assert_raises(
            exc.InvalidRequestError, lambda: User.keywords != self.kw
        )

    def test_join_separate_attr(self):
        User = self.classes.User
        self.assert_compile(
            self.session.query(User)
            .join(User.keywords.local_attr)
            .join(User.keywords.remote_attr),
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users JOIN userkeywords ON users.id = "
            "userkeywords.user_id JOIN keywords ON keywords.id = "
            "userkeywords.keyword_id",
        )

    def test_join_single_attr(self):
        User = self.classes.User
        self.assert_compile(
            self.session.query(User)
            .join(User.keywords.attr[0])
            .join(User.keywords.attr[1]),
            "SELECT users.id AS users_id, users.name AS users_name, "
            "users.singular_id AS users_singular_id "
            "FROM users JOIN userkeywords ON users.id = "
            "userkeywords.user_id JOIN keywords ON keywords.id = "
            "userkeywords.keyword_id",
        )


class DictOfTupleUpdateTest(fixtures.MappedTest):
    run_create_tables = None

    @classmethod
    def define_tables(cls, metadata):
        Table("a", metadata, Column("id", Integer, primary_key=True))
        Table(
            "b",
            metadata,
            Column("id", Integer, primary_key=True),
            Column("aid", Integer, ForeignKey("a.id")),
            Column("elem", String),
        )

    @classmethod
    def setup_mappers(cls):
        a, b = cls.tables("a", "b")

        class B(cls.Basic):
            def __init__(self, key, elem):
                self.key = key
                self.elem = elem

        class A(cls.Basic):
            elements = association_proxy("orig", "elem", creator=B)

        cls.mapper_registry.map_imperatively(
            A,
            a,
            properties={
                "orig": relationship(
                    B, collection_class=attribute_keyed_dict("key")
                )
            },
        )
        cls.mapper_registry.map_imperatively(B, b)

    def test_update_one_elem_dict(self):
        a1 = self.classes.A()
        a1.elements.update({("B", 3): "elem2"})
        eq_(a1.elements, {("B", 3): "elem2"})

    def test_update_multi_elem_dict(self):
        a1 = self.classes.A()
        a1.elements.update({("B", 3): "elem2", ("C", 4): "elem3"})
        eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"})

    def test_update_one_elem_list(self):
        a1 = self.classes.A()
        a1.elements.update([(("B", 3), "elem2")])
        eq_(a1.elements, {("B", 3): "elem2"})

    def test_update_multi_elem_list(self):
        a1 = self.classes.A()
        a1.elements.update([(("B", 3), "elem2"), (("C", 4), "elem3")])
        eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"})

    def test_update_one_elem_varg(self):
        a1 = self.classes.A()
        assert_raises_message(
            ValueError,
            "dictionary update sequence element #1 has length 5; "
            "2 is required",
            a1.elements.update,
            (("B", 3), "elem2"),
        )

    def test_update_multi_elem_varg(self):
        a1 = self.classes.A()
        assert_raises_message(
            TypeError,
            "update expected at most 1 arguments?, got 2",
            a1.elements.update,
            (("B", 3), "elem2"),
            (("C", 4), "elem3"),
        )


class CompositeAccessTest(fixtures.DeclarativeMappedTest):
    run_create_tables = None

    @classmethod
    def setup_classes(cls):
        class Point(cls.Basic):
            def __init__(self, x, y):
                self.x = x
                self.y = y

            def __composite_values__(self):
                return [self.x, self.y]

            __hash__ = None

            def __eq__(self, other):
                return (
                    isinstance(other, Point)
                    and other.x == self.x
                    and other.y == self.y
                )

            def __ne__(self, other):
                return not isinstance(other, Point) or not self.__eq__(other)

        class Graph(cls.DeclarativeBasic):
            __tablename__ = "graph"
            id = Column(
                Integer, primary_key=True, test_needs_autoincrement=True
            )
            name = Column(String(30))

            point_data = relationship("PointData")

            points = association_proxy(
                "point_data",
                "point",
                creator=lambda point: PointData(point=point),
            )

        class PointData(ComparableEntity, cls.DeclarativeBasic):
            __tablename__ = "point"

            id = Column(
                Integer, primary_key=True, test_needs_autoincrement=True
            )
            graph_id = Column(ForeignKey("graph.id"))

            x1 = Column(Integer)
            y1 = Column(Integer)

            point = composite(Point, x1, y1)

        return Point, Graph, PointData

    def test_append(self):
        Point, Graph, PointData = self.classes("Point", "Graph", "PointData")

        g1 = Graph()
        g1.points.append(Point(3, 5))
        eq_(g1.point_data, [PointData(point=Point(3, 5))])

    def test_access(self):
        Point, Graph, PointData = self.classes("Point", "Graph", "PointData")
        g1 = Graph()
        g1.point_data.append(PointData(point=Point(3, 5)))
        g1.point_data.append(PointData(point=Point(10, 7)))
        eq_(g1.points, [Point(3, 5), Point(10, 7)])


class AttributeAccessTest(fixtures.TestBase):
    def teardown_test(self):
        clear_mappers()

    def test_resolve_aliased_class(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            value = Column(String)

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)
            a_id = Column(Integer, ForeignKey(A.id))
            a = relationship(A)
            a_value = association_proxy("a", "value")

        spec = aliased(B).a_value

        is_(spec.owning_class, B)

        spec = B.a_value

        is_(spec.owning_class, B)

    def test_resolved_w_subclass(self):
        # test for issue #4185, as well as several below

        Base = declarative_base()

        class Mixin:
            @declared_attr
            def children(cls):
                return association_proxy("_children", "value")

        # 1. build parent, Mixin.children gets invoked, we add
        # Parent.children
        class Parent(Mixin, Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)

            _children = relationship("Child")

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        # 2. declarative builds up SubParent, scans through all attributes
        # over all classes.  Hits Mixin, hits "children", accesses "children"
        # in terms of the class, e.g. SubParent.children.  SubParent isn't
        # mapped yet.  association proxy then sets up "owning_class"
        # as NoneType.
        class SubParent(Parent):
            __tablename__ = "subparent"
            id = Column(Integer, ForeignKey(Parent.id), primary_key=True)

        configure_mappers()

        # 3. which would break here.
        p1 = Parent()
        eq_(p1.children, [])

    def test_resolved_to_correct_class_one(self):
        Base = declarative_base()

        class Parent(Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)
            _children = relationship("Child")
            children = association_proxy("_children", "value")

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        class SubParent(Parent):
            __tablename__ = "subparent"
            id = Column(Integer, ForeignKey(Parent.id), primary_key=True)

        is_(SubParent.children.owning_class, SubParent)
        is_(Parent.children.owning_class, Parent)

    def test_resolved_to_correct_class_two(self):
        Base = declarative_base()

        class Parent(Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)
            _children = relationship("Child")

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        class SubParent(Parent):
            __tablename__ = "subparent"
            id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
            children = association_proxy("_children", "value")

        is_(SubParent.children.owning_class, SubParent)

    def test_resolved_to_correct_class_three(self):
        Base = declarative_base()

        class Parent(Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)
            _children = relationship("Child")

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        class SubParent(Parent):
            __tablename__ = "subparent"
            id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
            children = association_proxy("_children", "value")

        class SubSubParent(SubParent):
            __tablename__ = "subsubparent"
            id = Column(Integer, ForeignKey(SubParent.id), primary_key=True)

        is_(SubParent.children.owning_class, SubParent)
        is_(SubSubParent.children.owning_class, SubSubParent)

    def test_resolved_to_correct_class_four(self):
        Base = declarative_base()

        class Parent(Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)
            _children = relationship("Child")
            children = association_proxy(
                "_children", "value", creator=lambda value: Child(value=value)
            )

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        class SubParent(Parent):
            __tablename__ = "subparent"
            id = Column(Integer, ForeignKey(Parent.id), primary_key=True)

        sp = SubParent()
        sp.children = "c"
        is_(SubParent.children.owning_class, SubParent)
        is_(Parent.children.owning_class, Parent)

    def test_resolved_to_correct_class_five(self):
        Base = declarative_base()

        class Mixin:
            children = association_proxy("_children", "value")

        class Parent(Mixin, Base):
            __tablename__ = "parent"
            id = Column(Integer, primary_key=True)
            _children = relationship("Child")

        class Child(Base):
            __tablename__ = "child"
            parent_id = Column(
                Integer, ForeignKey(Parent.id), primary_key=True
            )
            value = Column(String)

        # this triggers the owning routine, doesn't fail
        Mixin.children

        p1 = Parent()

        c1 = Child(value="c1")
        p1._children.append(c1)
        is_(Parent.children.owning_class, Parent)
        eq_(p1.children, ["c1"])

    def _test_never_assign_nonetype(self):
        foo = association_proxy("x", "y")
        foo._calc_owner(None, None)
        is_(foo.owning_class, None)

        class Bat:
            foo = association_proxy("x", "y")

        Bat.foo
        is_(Bat.foo.owning_class, None)

        b1 = Bat()
        assert_raises_message(
            exc.InvalidRequestError,
            "This association proxy has no mapped owning class; "
            "can't locate a mapped property",
            getattr,
            b1,
            "foo",
        )
        is_(Bat.foo.owning_class, None)

        # after all that, we can map it
        mapper(
            Bat,
            Table("bat", MetaData(), Column("x", Integer, primary_key=True)),
        )

        # answer is correct
        is_(Bat.foo.owning_class, Bat)


class ScalarRemoveTest:
    useobject = None
    cascade_scalar_deletes = None
    uselist = None
    create_on_none_assignment = False

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "test_a"
            id = Column(Integer, primary_key=True)
            ab = relationship("AB", backref="a", uselist=cls.uselist)
            b = association_proxy(
                "ab",
                "b",
                creator=lambda b: AB(b=b),
                cascade_scalar_deletes=cls.cascade_scalar_deletes,
                create_on_none_assignment=cls.create_on_none_assignment,
            )

        if cls.useobject:

            class B(Base):
                __tablename__ = "test_b"
                id = Column(Integer, primary_key=True)
                ab = relationship("AB", backref="b")

            class AB(Base):
                __tablename__ = "test_ab"
                a_id = Column(Integer, ForeignKey(A.id), primary_key=True)
                b_id = Column(Integer, ForeignKey(B.id), primary_key=True)

        else:

            class AB(Base):
                __tablename__ = "test_ab"
                b = Column(Integer)
                a_id = Column(Integer, ForeignKey(A.id), primary_key=True)

    def test_set_nonnone_to_none(self):
        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        a1 = A()

        b1 = B() if self.useobject else 5

        if self.uselist:
            a1.b.append(b1)
        else:
            a1.b = b1

        if self.uselist:
            assert isinstance(a1.ab[0], AB)
        else:
            assert isinstance(a1.ab, AB)

        if self.uselist:
            a1.b.remove(b1)
        else:
            a1.b = None

        if self.uselist:
            eq_(a1.ab, [])
        else:
            if self.cascade_scalar_deletes:
                assert a1.ab is None
            else:
                assert isinstance(a1.ab, AB)
                assert a1.ab.b is None

    def test_set_none_to_none(self):
        if self.uselist:
            return

        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        a1 = A()

        a1.b = None

        if self.create_on_none_assignment:
            assert isinstance(a1.ab, AB)
            assert a1.ab is not None
            eq_(a1.ab.b, None)
        else:
            assert a1.ab is None

    def test_del_already_nonpresent(self):
        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        a1 = A()

        if self.uselist:
            del a1.b

            eq_(a1.ab, [])

        else:

            def go():
                del a1.b

            assert_raises_message(
                AttributeError, "A.ab object does not have a value", go
            )

    def test_del(self):
        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        b1 = B() if self.useobject else 5

        a1 = A()
        if self.uselist:
            a1.b.append(b1)
        else:
            a1.b = b1

        if self.uselist:
            assert isinstance(a1.ab[0], AB)
        else:
            assert isinstance(a1.ab, AB)

        del a1.b

        if self.uselist:
            eq_(a1.ab, [])
        else:
            assert a1.ab is None

    def test_del_no_proxy(self):
        if not self.uselist:
            return

        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        b1 = B() if self.useobject else 5
        a1 = A()
        a1.b.append(b1)

        del a1.ab

        # this is what it does for now, so maintain that w/ assoc proxy
        eq_(a1.ab, [])

    def test_del_already_nonpresent_no_proxy(self):
        if not self.uselist:
            return

        if self.useobject:
            A, AB, B = self.classes("A", "AB", "B")
        else:
            A, AB = self.classes("A", "AB")

        a1 = A()

        del a1.ab

        # this is what it does for now, so maintain that w/ assoc proxy
        eq_(a1.ab, [])


class ScalarRemoveListObjectCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = True
    cascade_scalar_deletes = True
    uselist = True


class ScalarRemoveScalarObjectCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = True
    cascade_scalar_deletes = True
    uselist = False


class ScalarRemoveListScalarCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = False
    cascade_scalar_deletes = True
    uselist = True


class ScalarRemoveScalarScalarCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = False
    cascade_scalar_deletes = True
    uselist = False


class ScalarRemoveListObjectNoCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = True
    cascade_scalar_deletes = False
    uselist = True


class ScalarRemoveScalarObjectNoCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = True
    cascade_scalar_deletes = False
    uselist = False


class ScalarRemoveScalarObjectNoCascadeNoneAssign(
    ScalarRemoveScalarObjectNoCascade
):
    create_on_none_assignment = True


class ScalarRemoveListScalarNoCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = False
    cascade_scalar_deletes = False
    uselist = True


class ScalarRemoveListScalarNoCascadeNoneAssign(
    ScalarRemoveScalarObjectNoCascade
):
    create_on_none_assignment = True


class ScalarRemoveScalarScalarNoCascade(
    ScalarRemoveTest, fixtures.DeclarativeMappedTest
):
    run_create_tables = None
    useobject = False
    cascade_scalar_deletes = False
    uselist = False


class InfoTest(fixtures.TestBase):
    def test_constructor(self):
        assoc = association_proxy("a", "b", info={"some_assoc": "some_value"})
        eq_(assoc.info, {"some_assoc": "some_value"})

    def test_empty(self):
        assoc = association_proxy("a", "b")
        eq_(assoc.info, {})

    def test_via_cls(self):
        class Foob:
            assoc = association_proxy("a", "b")

        eq_(Foob.assoc.info, {})

        Foob.assoc.info["foo"] = "bar"

        eq_(Foob.assoc.info, {"foo": "bar"})


class OnlyRelationshipTest(fixtures.DeclarativeMappedTest):
    run_define_tables = None
    run_create_tables = None
    run_inserts = None
    run_deletes = None

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class Foo(Base):
            __tablename__ = "foo"

            id = Column(Integer, primary_key=True)
            foo = Column(String)  # assume some composite datatype

            bar = association_proxy("foo", "attr")

    def test_setattr(self):
        Foo = self.classes.Foo

        f1 = Foo()

        assert_raises_message(
            NotImplementedError,
            "association proxy to a non-relationship "
            "intermediary is not supported",
            setattr,
            f1,
            "bar",
            "asdf",
        )

    def test_getattr(self):
        Foo = self.classes.Foo

        f1 = Foo()

        assert_raises_message(
            NotImplementedError,
            "association proxy to a non-relationship "
            "intermediary is not supported",
            getattr,
            f1,
            "bar",
        )

    def test_get_class_attr(self):
        Foo = self.classes.Foo

        assert_raises_message(
            NotImplementedError,
            "association proxy to a non-relationship "
            "intermediary is not supported",
            getattr,
            Foo,
            "bar",
        )


class MultiOwnerTest(
    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
):
    __dialect__ = "default"

    run_define_tables = "each"
    run_create_tables = None
    run_inserts = None
    run_deletes = None
    run_setup_classes = "each"
    run_setup_mappers = "each"

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            type = Column(String(5), nullable=False)
            d_values = association_proxy("ds", "value")

            __mapper_args__ = {"polymorphic_on": type}

        class B(A):
            __tablename__ = "b"
            id = Column(ForeignKey("a.id"), primary_key=True)

            c1_id = Column(ForeignKey("c1.id"))

            ds = relationship("D", primaryjoin="D.b_id == B.id")

            __mapper_args__ = {"polymorphic_identity": "b"}

        class C(A):
            __tablename__ = "c"
            id = Column(ForeignKey("a.id"), primary_key=True)

            ds = relationship(
                "D", primaryjoin="D.c_id == C.id", back_populates="c"
            )

            __mapper_args__ = {"polymorphic_identity": "c"}

        class C1(C):
            __tablename__ = "c1"
            id = Column(ForeignKey("c.id"), primary_key=True)

            csub_only_data = relationship("B")  # uselist=True relationship

            ds = relationship(
                "D", primaryjoin="D.c1_id == C1.id", back_populates="c"
            )

            __mapper_args__ = {"polymorphic_identity": "c1"}

        class C2(C):
            __tablename__ = "c2"
            id = Column(ForeignKey("c.id"), primary_key=True)

            csub_only_data = Column(String(50))  # scalar Column

            ds = relationship(
                "D", primaryjoin="D.c2_id == C2.id", back_populates="c"
            )

            __mapper_args__ = {"polymorphic_identity": "c2"}

        class D(Base):
            __tablename__ = "d"
            id = Column(Integer, primary_key=True)
            value = Column(String(50))
            b_id = Column(ForeignKey("b.id"))
            c_id = Column(ForeignKey("c.id"))
            c1_id = Column(ForeignKey("c1.id"))
            c2_id = Column(ForeignKey("c2.id"))

            c = relationship("C", primaryjoin="D.c_id == C.id")

            c_data = association_proxy("c", "csub_only_data")

    def _assert_raises_ambiguous(self, fn, *arg, **kw):
        assert_raises_message(
            AttributeError,
            "Association proxy D.c refers to an attribute 'csub_only_data'",
            fn,
            *arg,
            **kw,
        )

    def _assert_raises_attribute(self, message, fn, *arg, **kw):
        assert_raises_message(AttributeError, message, fn, *arg, **kw)

    def test_column_collection_expressions(self):
        B, C, C2 = self.classes("B", "C", "C2")

        self.assert_compile(
            B.d_values.contains("b1"),
            "EXISTS (SELECT 1 FROM d, b WHERE d.b_id = b.id "
            "AND (d.value LIKE '%' || :value_1 || '%'))",
        )

        self.assert_compile(
            C2.d_values.contains("c2"),
            "EXISTS (SELECT 1 FROM d, c2 WHERE d.c2_id = c2.id "
            "AND (d.value LIKE '%' || :value_1 || '%'))",
        )

        self.assert_compile(
            C.d_values.contains("c1"),
            "EXISTS (SELECT 1 FROM d, c WHERE d.c_id = c.id "
            "AND (d.value LIKE '%' || :value_1 || '%'))",
        )

    def test_subclass_only_owner_none(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D()
        eq_(d1.c_data, None)

    def test_subclass_only_owner_assign(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C2())
        d1.c_data = "some c2"
        eq_(d1.c_data, "some c2")

    def test_subclass_only_owner_get(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C2(csub_only_data="some c2"))
        eq_(d1.c_data, "some c2")

    def test_subclass_only_owner_none_raise(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D()
        eq_(d1.c_data, None)

    def test_subclass_only_owner_delete(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C2(csub_only_data="some c2"))
        eq_(d1.c.csub_only_data, "some c2")
        del d1.c_data
        assert not hasattr(d1.c, "csub_only_data")

    def test_subclass_only_owner_assign_passes(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C())
        d1.c_data = "some c1"

        # not mapped, but we set it
        eq_(d1.c.csub_only_data, "some c1")

    def test_subclass_only_owner_get_raises(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C())
        self._assert_raises_attribute(
            "'C' object has no attribute 'csub_only_data'",
            getattr,
            d1,
            "c_data",
        )

    def test_subclass_only_owner_delete_raises(self):
        D, C, C2 = self.classes("D", "C", "C2")

        d1 = D(c=C2(csub_only_data="some c2"))
        eq_(d1.c_data, "some c2")

        # now switch
        d1.c = C()

        self._assert_raises_attribute("csub_only_data", delattr, d1, "c_data")

    def test_subclasses_conflicting_types(self):
        B, D, C, C1, C2 = self.classes("B", "D", "C", "C1", "C2")

        bs = [B(), B()]
        d1 = D(c=C1(csub_only_data=bs))
        d2 = D(c=C2(csub_only_data="some c2"))

        association_proxy_object = inspect(D).all_orm_descriptors["c_data"]
        inst1 = association_proxy_object.for_class(D, d1)
        inst2 = association_proxy_object.for_class(D, d2)

        eq_(inst1._target_is_object, True)
        eq_(inst2._target_is_object, False)

        # both instances are cached
        inst0 = association_proxy_object.for_class(D)
        eq_(inst0._lookup_cache, {C1: inst1, C2: inst2})

        # cache works
        is_(association_proxy_object.for_class(D, d1), inst1)
        is_(association_proxy_object.for_class(D, d2), inst2)

    def test_col_expressions_not_available(self):
        (D,) = self.classes("D")

        self._assert_raises_ambiguous(lambda: D.c_data == 5)

    def test_rel_expressions_not_available(self):
        (
            B,
            D,
        ) = self.classes("B", "D")

        self._assert_raises_ambiguous(lambda: D.c_data.any(B.id == 5))


class ProxyOfSynonymTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest):
    __dialect__ = "default"

    run_create_tables = None

    @classmethod
    def setup_classes(cls):
        from sqlalchemy.orm import synonym

        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            data = Column(String)
            bs = relationship("B", backref="a")
            data_syn = synonym("data")

            b_data = association_proxy("bs", "data_syn")

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)
            a_id = Column(ForeignKey("a.id"))
            data = Column(String)
            data_syn = synonym("data")

            a_data = association_proxy("a", "data_syn")

    def test_o2m_instance_getter(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="bdata1"), B(data="bdata2")])
        eq_(a1.b_data, ["bdata1", "bdata2"])

    def test_m2o_instance_getter(self):
        A, B = self.classes("A", "B")

        b1 = B(a=A(data="adata"))
        eq_(b1.a_data, "adata")

    def test_o2m_expr(self):
        A, B = self.classes("A", "B")

        self.assert_compile(
            A.b_data == "foo",
            "EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id "
            "AND b.data = :data_1)",
        )


class SynonymOfProxyTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest):
    __dialect__ = "default"

    run_create_tables = None

    @classmethod
    def setup_classes(cls):
        from sqlalchemy.orm import synonym

        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            data = Column(String)
            bs = relationship("B", backref="a")

            b_data = association_proxy("bs", "data")

            b_data_syn = synonym("b_data")

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)
            a_id = Column(ForeignKey("a.id"))
            data = Column(String)

    def test_hasattr(self):
        A, B = self.classes("A", "B")
        is_false(hasattr(A.b_data_syn, "nonexistent"))

    def test_o2m_instance_getter(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="bdata1"), B(data="bdata2")])
        eq_(a1.b_data_syn, ["bdata1", "bdata2"])

    def test_o2m_expr(self):
        A, B = self.classes("A", "B")

        self.assert_compile(
            A.b_data_syn == "foo",
            "EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id "
            "AND b.data = :data_1)",
        )


class ProxyHybridTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL):
    __dialect__ = "default"

    run_create_tables = None

    @classmethod
    def setup_classes(cls):
        from sqlalchemy.ext.hybrid import hybrid_property
        from sqlalchemy.orm.interfaces import PropComparator

        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            bs = relationship("B")

            b_data = association_proxy("bs", "value")
            well_behaved_b_data = association_proxy("bs", "well_behaved_value")

            fails_on_class_access = association_proxy(
                "bs", "fails_on_class_access"
            )

        class B(Base):
            __tablename__ = "b"

            id = Column(Integer, primary_key=True)
            aid = Column(ForeignKey("a.id"))
            data = Column(String(50))

            @hybrid_property
            def well_behaved_value(self):
                return self.data

            @well_behaved_value.setter
            def well_behaved_value(self, value):
                self.data = value

            @hybrid_property
            def value(self):
                return self.data

            @value.setter
            def value(self, value):
                self.data = value

            @value.comparator
            class value(PropComparator):
                # comparator has no proxy __getattr__, so we can't
                # get to impl to see what we ar proxying towards.
                # as of #4690 we assume column-oriented proxying
                def __init__(self, cls):
                    self.cls = cls

            @hybrid_property
            def well_behaved_w_expr(self):
                return self.data

            @well_behaved_w_expr.setter
            def well_behaved_w_expr(self, value):
                self.data = value

            @well_behaved_w_expr.expression
            def well_behaved_w_expr(cls):
                return cast(cls.data, Integer)

            @hybrid_property
            def fails_on_class_access(self):
                return len(self.data)

        class C(Base):
            __tablename__ = "c"

            id = Column(Integer, primary_key=True)
            b_id = Column(ForeignKey("b.id"))
            _b = relationship("B")
            attr = association_proxy("_b", "well_behaved_w_expr")

    def test_msg_fails_on_cls_access(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="b1")])

        with expect_raises_message(
            exc.InvalidRequestError,
            "Association proxy received an unexpected error when trying to "
            'retreive attribute "B.fails_on_class_access" from '
            r'class "B": .* no len\(\)',
        ):
            a1.fails_on_class_access

    def test_get_ambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="b1")])
        eq_(a1.b_data[0], "b1")

    def test_get_nonambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="b1")])
        eq_(a1.well_behaved_b_data[0], "b1")

    def test_set_ambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B()])

        a1.b_data[0] = "b1"
        eq_(a1.b_data[0], "b1")

    def test_set_nonambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B()])

        a1.b_data[0] = "b1"
        eq_(a1.well_behaved_b_data[0], "b1")

    def test_expr_nonambiguous(self):
        A, B = self.classes("A", "B")

        eq_(
            str(A.well_behaved_b_data == 5),
            "EXISTS (SELECT 1 \nFROM b, a \nWHERE "
            "a.id = b.aid AND b.data = :data_1)",
        )

    def test_get_classlevel_ambiguous(self):
        A, B = self.classes("A", "B")

        eq_(
            str(A.b_data),
            "ColumnAssociationProxyInstance"
            "(AssociationProxy('bs', 'value'))",
        )

    def test_comparator_ambiguous(self):
        A, B = self.classes("A", "B")

        s = fixture_session()
        self.assert_compile(
            s.query(A).filter(A.b_data.any()),
            "SELECT a.id AS a_id FROM a WHERE EXISTS "
            "(SELECT 1 FROM b WHERE a.id = b.aid)",
        )

    def test_explicit_expr(self):
        (C,) = self.classes("C")

        s = fixture_session()
        self.assert_compile(
            s.query(C).filter_by(attr=5),
            "SELECT c.id AS c_id, c.b_id AS c_b_id FROM c WHERE EXISTS "
            "(SELECT 1 FROM b WHERE b.id = c.b_id AND "
            "CAST(b.data AS INTEGER) = :param_1)",
        )


class ProxyPlainPropertyTest(fixtures.DeclarativeMappedTest):
    run_create_tables = None

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            bs = relationship("B")

            b_data = association_proxy("bs", "value")

        class B(Base):
            __tablename__ = "b"

            id = Column(Integer, primary_key=True)
            aid = Column(ForeignKey("a.id"))
            data = Column(String(50))

            @property
            def value(self):
                return self.data

            @value.setter
            def value(self, value):
                self.data = value

    def test_get_ambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B(data="b1")])
        eq_(a1.b_data[0], "b1")

    def test_set_ambiguous(self):
        A, B = self.classes("A", "B")

        a1 = A(bs=[B()])

        a1.b_data[0] = "b1"
        eq_(a1.b_data[0], "b1")

    def test_get_classlevel_ambiguous(self):
        A, B = self.classes("A", "B")

        eq_(
            str(A.b_data),
            "AmbiguousAssociationProxyInstance"
            "(AssociationProxy('bs', 'value'))",
        )

    def test_expr_ambiguous(self):
        A, B = self.classes("A", "B")

        assert_raises_message(
            AttributeError,
            "Association proxy A.bs refers to an attribute "
            "'value' that is not directly mapped",
            lambda: A.b_data == 5,
        )


class ScopeBehaviorTest(fixtures.DeclarativeMappedTest):
    # test some GC scenarios, including issue #4268

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class A(Base):
            __tablename__ = "a"

            id = Column(Integer, primary_key=True)
            data = Column(String(50))
            bs = relationship("B")

            b_dyn = relationship("B", lazy="dynamic", viewonly=True)

            b_data = association_proxy("bs", "data")

            b_dynamic_data = association_proxy("bs", "data")

        class B(Base):
            __tablename__ = "b"

            id = Column(Integer, primary_key=True)
            aid = Column(ForeignKey("a.id"))
            data = Column(String(50))

    @classmethod
    def insert_data(cls, connection):
        A, B = cls.classes("A", "B")

        s = Session(connection)
        s.add_all(
            [
                A(id=1, bs=[B(data="b1"), B(data="b2")]),
                A(id=2, bs=[B(data="b3"), B(data="b4")]),
            ]
        )
        s.commit()
        s.close()

    def test_plain_collection_gc(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)
        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.bs  # noqa

        del a1

        gc_collect()

        assert (A, (1,), None) not in s.identity_map

    @testing.fails("dynamic relationship strong references parent")
    def test_dynamic_collection_gc(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_dyn  # noqa

        del a1

        gc_collect()

        # also fails, AppenderQuery holds onto parent
        assert (A, (1,), None) not in s.identity_map

    @testing.fails("association proxy strong references parent")
    def test_associated_collection_gc(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_data  # noqa

        del a1

        gc_collect()

        assert (A, (1,), None) not in s.identity_map

    @testing.fails("association proxy strong references parent")
    def test_associated_dynamic_gc(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_dynamic_data  # noqa

        del a1

        gc_collect()

        assert (A, (1,), None) not in s.identity_map

    def test_plain_collection_iterate(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.bs

        del a1

        gc_collect()

        assert len(a1bs) == 2

    def test_dynamic_collection_iterate(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_dyn  # noqa

        del a1

        gc_collect()

        assert len(list(a1bs)) == 2

    def test_associated_collection_iterate(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_data

        del a1

        gc_collect()

        assert len(a1bs) == 2

    def test_associated_dynamic_iterate(self):
        A, B = self.classes("A", "B")

        s = Session(testing.db)

        a1 = s.query(A).filter_by(id=1).one()

        a1bs = a1.b_dynamic_data

        del a1

        gc_collect()

        assert len(a1bs) == 2


class DeclOrmForms(fixtures.TestBase):
    """test issues related to #8880, #8878, #8876"""

    def test_straight_decl_usage(self, decl_base):
        """test use of assoc prox as the default descriptor for a
        dataclasses.field.

        """

        class User(decl_base):
            __allow_unmapped__ = True

            __tablename__ = "user"

            id: Mapped[int] = mapped_column(primary_key=True)

            user_keyword_associations: Mapped[List[UserKeywordAssociation]] = (
                relationship(
                    back_populates="user",
                    cascade="all, delete-orphan",
                )
            )

            keywords: AssociationProxy[list[str]] = association_proxy(
                "user_keyword_associations", "keyword"
            )

        UserKeywordAssociation, Keyword = self._keyword_mapping(
            User, decl_base
        )

        self._assert_keyword_assoc_mapping(
            User, UserKeywordAssociation, Keyword, init=True
        )

    @testing.variation("embed_in_field", [True, False])
    @testing.combinations(
        {},
        {"repr": False},
        {"repr": True},
        ({"kw_only": True}, testing.requires.python310),
        {"init": False},
        {"default_factory": True},
        argnames="field_kw",
    )
    def test_dc_decl_usage(self, dc_decl_base, embed_in_field, field_kw):
        """test use of assoc prox as the default descriptor for a
        dataclasses.field.

        This exercises #8880

        """

        if field_kw.pop("default_factory", False) and not embed_in_field:
            has_default_factory = True
            field_kw["default_factory"] = lambda: [
                Keyword("l1"),
                Keyword("l2"),
                Keyword("l3"),
            ]
        else:
            has_default_factory = False

        class User(dc_decl_base):
            __allow_unmapped__ = True

            __tablename__ = "user"

            id: Mapped[int] = mapped_column(
                primary_key=True, repr=True, init=False
            )

            user_keyword_associations: Mapped[List[UserKeywordAssociation]] = (
                relationship(
                    back_populates="user",
                    cascade="all, delete-orphan",
                    init=False,
                )
            )

            if embed_in_field:
                # this is an incorrect form to use with
                # MappedAsDataclass.  However, we want to make sure it
                # works as kind of a test to ensure we are being as well
                # behaved as possible with an explicit dataclasses.field(),
                # by testing that it uses its normal descriptor-as-default
                # behavior
                keywords: AssociationProxy[list[str]] = dataclasses.field(
                    default=association_proxy(
                        "user_keyword_associations", "keyword"
                    ),
                    **field_kw,
                )
            else:
                keywords: AssociationProxy[list[str]] = association_proxy(
                    "user_keyword_associations", "keyword", **field_kw
                )

        UserKeywordAssociation, Keyword = self._dc_keyword_mapping(
            User, dc_decl_base
        )

        # simplify __qualname__ so we can test repr() more easily
        User.__qualname__ = "mod.User"
        UserKeywordAssociation.__qualname__ = "mod.UserKeywordAssociation"
        Keyword.__qualname__ = "mod.Keyword"

        init = field_kw.get("init", True)

        u1 = self._assert_keyword_assoc_mapping(
            User,
            UserKeywordAssociation,
            Keyword,
            init=init,
            has_default_factory=has_default_factory,
        )

        if field_kw.get("repr", True):
            eq_(
                repr(u1),
                "mod.User(id=None, user_keyword_associations=["
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k1'), user=...), "
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k2'), user=...), "
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k3'), user=...)], "
                "keywords=[mod.Keyword(id=None, keyword='k1'), "
                "mod.Keyword(id=None, keyword='k2'), "
                "mod.Keyword(id=None, keyword='k3')])",
            )
        else:
            eq_(
                repr(u1),
                "mod.User(id=None, user_keyword_associations=["
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k1'), user=...), "
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k2'), user=...), "
                "mod.UserKeywordAssociation(user_id=None, keyword_id=None, "
                "keyword=mod.Keyword(id=None, keyword='k3'), user=...)])",
            )

    def _assert_keyword_assoc_mapping(
        self,
        User,
        UserKeywordAssociation,
        Keyword,
        *,
        init,
        has_default_factory=False,
    ):
        if not init:
            with expect_raises_message(
                TypeError, r"got an unexpected keyword argument 'keywords'"
            ):
                User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")])

        if has_default_factory:
            u1 = User()
            eq_(u1.keywords, [Keyword("l1"), Keyword("l2"), Keyword("l3")])

            eq_(
                [ka.keyword.keyword for ka in u1.user_keyword_associations],
                ["l1", "l2", "l3"],
            )

        if init:
            u1 = User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")])
        else:
            u1 = User()
            u1.keywords = [Keyword("k1"), Keyword("k2"), Keyword("k3")]

        eq_(u1.keywords, [Keyword("k1"), Keyword("k2"), Keyword("k3")])

        eq_(
            [ka.keyword.keyword for ka in u1.user_keyword_associations],
            ["k1", "k2", "k3"],
        )

        return u1

    def _keyword_mapping(self, User, decl_base):
        class UserKeywordAssociation(decl_base):
            __tablename__ = "user_keyword"
            user_id: Mapped[int] = mapped_column(
                ForeignKey("user.id"), primary_key=True
            )
            keyword_id: Mapped[int] = mapped_column(
                ForeignKey("keyword.id"), primary_key=True
            )

            user: Mapped[User] = relationship(
                back_populates="user_keyword_associations",
            )

            keyword: Mapped[Keyword] = relationship()

            def __init__(self, keyword=None, user=None):
                self.user = user
                self.keyword = keyword

        class Keyword(ComparableMixin, decl_base):
            __tablename__ = "keyword"
            id: Mapped[int] = mapped_column(primary_key=True)
            keyword: Mapped[str] = mapped_column()

            def __init__(self, keyword):
                self.keyword = keyword

        return UserKeywordAssociation, Keyword

    def _dc_keyword_mapping(self, User, dc_decl_base):
        class UserKeywordAssociation(dc_decl_base):
            __tablename__ = "user_keyword"
            user_id: Mapped[int] = mapped_column(
                ForeignKey("user.id"), primary_key=True, init=False
            )
            keyword_id: Mapped[int] = mapped_column(
                ForeignKey("keyword.id"), primary_key=True, init=False
            )

            keyword: Mapped[Keyword] = relationship(default=None)

            user: Mapped[User] = relationship(
                back_populates="user_keyword_associations", default=None
            )

        class Keyword(dc_decl_base):
            __tablename__ = "keyword"
            id: Mapped[int] = mapped_column(primary_key=True, init=False)
            keyword: Mapped[str] = mapped_column(init=True)

        return UserKeywordAssociation, Keyword
