from decimal import Decimal

from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy import literal_column
from sqlalchemy import Numeric
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy.ext import hybrid
from sqlalchemy.orm import aliased
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm import synonym
from sqlalchemy.sql import update
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.schema import Column


class PropertyComparatorTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class UCComparator(hybrid.Comparator):
            def __eq__(self, other):
                if other is None:
                    return self.expression is None
                else:
                    return func.upper(self.expression) == func.upper(other)

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)

            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                "This is a docstring"
                return self._value - 5

            @value.comparator
            def value(cls):
                return UCComparator(cls._value)

            @value.setter
            def value(self, v):
                self._value = v + 5

        return A

    def test_set_get(self):
        A = self._fixture()
        a1 = A(value=5)
        eq_(a1._value, 10)
        eq_(a1.value, 5)

    def test_value(self):
        A = self._fixture()
        eq_(str(A.value == 5), "upper(a.value) = upper(:upper_1)")

    def test_aliased_value(self):
        A = self._fixture()
        eq_(str(aliased(A).value == 5), "upper(a_1.value) = upper(:upper_1)")

    def test_query(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(A.value), "SELECT a.value AS a_value FROM a"
        )

    def test_aliased_query(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(aliased(A).value),
            "SELECT a_1.value AS a_1_value FROM a AS a_1",
        )

    def test_aliased_filter(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(aliased(A)).filter_by(value="foo"),
            "SELECT a_1.value AS a_1_value, a_1.id AS a_1_id "
            "FROM a AS a_1 WHERE upper(a_1.value) = upper(:upper_1)",
        )

    def test_docstring(self):
        A = self._fixture()
        eq_(A.value.__doc__, "This is a docstring")

    def test_no_name_one(self):
        """test :ticket:`6215`"""

        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            name = Column(String(50))

            @hybrid.hybrid_property
            def same_name(self):
                return self.id

            def name1(self):
                return self.id

            different_name = hybrid.hybrid_property(name1)

            no_name = hybrid.hybrid_property(lambda self: self.name)

        stmt = select(A.same_name, A.different_name, A.no_name)
        compiled = stmt.compile()

        eq_(
            [ent._label_name for ent in compiled.compile_state._entities],
            ["same_name", "id", "name"],
        )

    def test_no_name_two(self):
        """test :ticket:`6215`"""
        Base = declarative_base()

        class SomeMixin(object):
            @hybrid.hybrid_property
            def same_name(self):
                return self.id

            def name1(self):
                return self.id

            different_name = hybrid.hybrid_property(name1)

            no_name = hybrid.hybrid_property(lambda self: self.name)

        class A(SomeMixin, Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            name = Column(String(50))

        stmt = select(A.same_name, A.different_name, A.no_name)
        compiled = stmt.compile()

        eq_(
            [ent._label_name for ent in compiled.compile_state._entities],
            ["same_name", "id", "name"],
        )


class PropertyExpressionTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                "This is an instance-level docstring"
                return int(self._value) - 5

            @value.expression
            def value(cls):
                "This is a class-level docstring"
                return func.foo(cls._value) + cls.bar_value

            @value.setter
            def value(self, v):
                self._value = v + 5

            @hybrid.hybrid_property
            def bar_value(cls):
                return func.bar(cls._value)

        return A

    def _wrong_expr_fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                return self._value is not None

            @value.expression
            def value(cls):
                return cls._value is not None

        return A

    def _relationship_fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            b_id = Column("bid", Integer, ForeignKey("b.id"))
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                return int(self._value) - 5

            @value.expression
            def value(cls):
                return func.foo(cls._value) + cls.bar_value

            @value.setter
            def value(self, v):
                self._value = v + 5

            @hybrid.hybrid_property
            def bar_value(cls):
                return func.bar(cls._value)

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)

            as_ = relationship("A")

        return A, B

    @testing.fixture
    def _related_polymorphic_attr_fixture(self):
        """test for #7425"""

        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)

            bs = relationship("B", back_populates="a", lazy="joined")

        class B(Base):
            __tablename__ = "poly"
            __mapper_args__ = {
                "polymorphic_on": "type",
                # if with_polymorphic is removed, issue does not occur
                "with_polymorphic": "*",
            }
            name = Column(String, primary_key=True)
            type = Column(String)
            a_id = Column(ForeignKey(A.id))

            a = relationship(A, back_populates="bs")

            @hybrid.hybrid_property
            def is_foo(self):
                return self.name == "foo"

        return A, B

    def test_cloning_in_polymorphic_any(
        self, _related_polymorphic_attr_fixture
    ):
        A, B = _related_polymorphic_attr_fixture

        session = fixture_session()

        # in the polymorphic case, A.bs.any() does a traverse() / clone()
        # on the expression.  so the proxedattribute coming from the hybrid
        # has to support this.

        self.assert_compile(
            session.query(A).filter(A.bs.any(B.name == "foo")),
            "SELECT a.id AS a_id, poly_1.name AS poly_1_name, poly_1.type "
            "AS poly_1_type, poly_1.a_id AS poly_1_a_id FROM a "
            "LEFT OUTER JOIN poly AS poly_1 ON a.id = poly_1.a_id "
            "WHERE EXISTS (SELECT 1 FROM poly WHERE a.id = poly.a_id "
            "AND poly.name = :name_1)",
        )

        # SQL should be identical
        self.assert_compile(
            session.query(A).filter(A.bs.any(B.is_foo)),
            "SELECT a.id AS a_id, poly_1.name AS poly_1_name, poly_1.type "
            "AS poly_1_type, poly_1.a_id AS poly_1_a_id FROM a "
            "LEFT OUTER JOIN poly AS poly_1 ON a.id = poly_1.a_id "
            "WHERE EXISTS (SELECT 1 FROM poly WHERE a.id = poly.a_id "
            "AND poly.name = :name_1)",
        )

    @testing.fixture
    def _unnamed_expr_fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            firstname = Column(String)
            lastname = Column(String)

            @hybrid.hybrid_property
            def name(self):
                return self.firstname + " " + self.lastname

        return A

    def test_labeling_for_unnamed(self, _unnamed_expr_fixture):
        A = _unnamed_expr_fixture

        stmt = select(A.id, A.name)
        self.assert_compile(
            stmt,
            "SELECT a.id, a.firstname || :firstname_1 || a.lastname AS name "
            "FROM a",
        )

        eq_(stmt.subquery().c.keys(), ["id", "name"])

        self.assert_compile(
            select(stmt.subquery()),
            "SELECT anon_1.id, anon_1.name "
            "FROM (SELECT a.id AS id, a.firstname || :firstname_1 || "
            "a.lastname AS name FROM a) AS anon_1",
        )

    def test_labeling_for_unnamed_tablename_plus_col(
        self, _unnamed_expr_fixture
    ):
        A = _unnamed_expr_fixture

        stmt = select(A.id, A.name).set_label_style(
            LABEL_STYLE_TABLENAME_PLUS_COL
        )
        # looks like legacy query
        self.assert_compile(
            stmt,
            "SELECT a.id AS a_id, a.firstname || :firstname_1 || "
            "a.lastname AS name FROM a",
        )

        eq_(stmt.subquery().c.keys(), ["a_id", "name"])

        self.assert_compile(
            select(stmt.subquery()),
            "SELECT anon_1.a_id, anon_1.name FROM (SELECT a.id AS a_id, "
            "a.firstname || :firstname_1 || a.lastname AS name FROM a) "
            "AS anon_1",
        )

    def test_labeling_for_unnamed_legacy(self, _unnamed_expr_fixture):
        A = _unnamed_expr_fixture

        sess = fixture_session()

        stmt = sess.query(A.id, A.name)

        self.assert_compile(
            stmt,
            "SELECT a.id AS a_id, a.firstname || "
            ":firstname_1 || a.lastname AS name FROM a",
        )

        # for the subquery, we lose the "ORM-ness" from the subquery
        # so we have to carry it over using _proxy_key
        eq_(stmt.subquery().c.keys(), ["id", "name"])

        self.assert_compile(
            sess.query(stmt.subquery()),
            "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name "
            "FROM (SELECT a.id AS id, a.firstname || :firstname_1 || "
            "a.lastname AS name FROM a) AS anon_1",
        )

    def test_info(self):
        A = self._fixture()
        inspect(A).all_orm_descriptors.value.info["some key"] = "some value"
        eq_(
            inspect(A).all_orm_descriptors.value.info,
            {"some key": "some value"},
        )

    def test_set_get(self):
        A = self._fixture()
        a1 = A(value=5)
        eq_(a1._value, 10)
        eq_(a1.value, 5)

    def test_expression(self):
        A = self._fixture()
        self.assert_compile(
            A.value.__clause_element__(), "foo(a.value) + bar(a.value)"
        )

    def test_expression_isnt_clause_element(self):
        A = self._wrong_expr_fixture()

        from sqlalchemy.sql import coercions, roles

        with testing.expect_raises_message(
            exc.InvalidRequestError,
            'When interpreting attribute "A.value" as a SQL expression, '
            r"expected __clause_element__\(\) to return a "
            "ClauseElement object, got: True",
        ):
            coercions.expect(roles.ExpressionElementRole, A.value)

    def test_any(self):
        A, B = self._relationship_fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(B).filter(B.as_.any(value=5)),
            "SELECT b.id AS b_id FROM b WHERE EXISTS "
            "(SELECT 1 FROM a WHERE b.id = a.bid "
            "AND foo(a.value) + bar(a.value) = :param_1)",
        )

    def test_aliased_expression(self):
        A = self._fixture()
        self.assert_compile(
            aliased(A).value.__clause_element__(),
            "foo(a_1.value) + bar(a_1.value)",
        )

    def test_query(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(A).filter_by(value="foo"),
            "SELECT a.value AS a_value, a.id AS a_id "
            "FROM a WHERE foo(a.value) + bar(a.value) = :param_1",
        )

    def test_aliased_query(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(aliased(A)).filter_by(value="foo"),
            "SELECT a_1.value AS a_1_value, a_1.id AS a_1_id "
            "FROM a AS a_1 WHERE foo(a_1.value) + bar(a_1.value) = :param_1",
        )

    def test_docstring(self):
        A = self._fixture()
        eq_(A.value.__doc__, "This is a class-level docstring")

        # no docstring here since we get a literal
        a1 = A(_value=10)
        eq_(a1.value, 5)


class PropertyValueTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self, assignable):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                return self._value - 5

            if assignable:

                @value.setter
                def value(self, v):
                    self._value = v + 5

        return A

    def test_nonassignable(self):
        A = self._fixture(False)
        a1 = A(_value=5)
        assert_raises_message(
            AttributeError, "can't set attribute", setattr, a1, "value", 10
        )

    def test_nondeletable(self):
        A = self._fixture(False)
        a1 = A(_value=5)
        assert_raises_message(
            AttributeError, "can't delete attribute", delattr, a1, "value"
        )

    def test_set_get(self):
        A = self._fixture(True)
        a1 = A(value=5)
        eq_(a1.value, 5)
        eq_(a1._value, 10)


class PropertyOverrideTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class Person(Base):
            __tablename__ = "person"
            id = Column(Integer, primary_key=True)
            _name = Column(String)

            @hybrid.hybrid_property
            def name(self):
                return self._name

            @name.setter
            def name(self, value):
                self._name = value.title()

        class OverrideSetter(Person):
            __tablename__ = "override_setter"
            id = Column(Integer, ForeignKey("person.id"), primary_key=True)
            other = Column(String)

            @Person.name.setter
            def name(self, value):
                self._name = value.upper()

        class OverrideGetter(Person):
            __tablename__ = "override_getter"
            id = Column(Integer, ForeignKey("person.id"), primary_key=True)
            other = Column(String)

            @Person.name.getter
            def name(self):
                return "Hello " + self._name

        class OverrideExpr(Person):
            __tablename__ = "override_expr"
            id = Column(Integer, ForeignKey("person.id"), primary_key=True)
            other = Column(String)

            @Person.name.overrides.expression
            def name(self):
                return func.concat("Hello", self._name)

        class FooComparator(hybrid.Comparator):
            def __clause_element__(self):
                return func.concat("Hello", self.expression._name)

        class OverrideComparator(Person):
            __tablename__ = "override_comp"
            id = Column(Integer, ForeignKey("person.id"), primary_key=True)
            other = Column(String)

            @Person.name.overrides.comparator
            def name(self):
                return FooComparator(self)

        return (
            Person,
            OverrideSetter,
            OverrideGetter,
            OverrideExpr,
            OverrideComparator,
        )

    def test_property(self):
        Person, _, _, _, _ = self._fixture()
        p1 = Person()
        p1.name = "mike"
        eq_(p1._name, "Mike")
        eq_(p1.name, "Mike")

    def test_override_setter(self):
        _, OverrideSetter, _, _, _ = self._fixture()
        p1 = OverrideSetter()
        p1.name = "mike"
        eq_(p1._name, "MIKE")
        eq_(p1.name, "MIKE")

    def test_override_getter(self):
        _, _, OverrideGetter, _, _ = self._fixture()
        p1 = OverrideGetter()
        p1.name = "mike"
        eq_(p1._name, "Mike")
        eq_(p1.name, "Hello Mike")

    def test_override_expr(self):
        Person, _, _, OverrideExpr, _ = self._fixture()

        self.assert_compile(Person.name.__clause_element__(), "person._name")

        self.assert_compile(
            OverrideExpr.name.__clause_element__(),
            "concat(:concat_1, person._name)",
        )

    def test_override_comparator(self):
        Person, _, _, _, OverrideComparator = self._fixture()

        self.assert_compile(Person.name.__clause_element__(), "person._name")

        self.assert_compile(
            OverrideComparator.name.__clause_element__(),
            "concat(:concat_1, person._name)",
        )


class PropertyMirrorTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                "This is an instance-level docstring"
                return self._value

        return A

    @testing.fixture
    def _function_fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            value = Column(Integer)

            @hybrid.hybrid_property
            def foo_value(self):
                return func.foo(self.value)

        return A

    @testing.fixture
    def _name_mismatch_fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            addresses = relationship("B")

            @hybrid.hybrid_property
            def some_email(self):
                if self.addresses:
                    return self.addresses[0].email_address
                else:
                    return None

            @some_email.expression
            def some_email(cls):
                return B.email_address

        class B(Base):
            __tablename__ = "b"
            id = Column(Integer, primary_key=True)
            aid = Column(ForeignKey("a.id"))
            email_address = Column(String)

        return A, B

    def test_dont_assume_attr_key_is_present(self, _name_mismatch_fixture):
        A, B = _name_mismatch_fixture
        self.assert_compile(
            select(A, A.some_email).join(A.addresses),
            "SELECT a.id, b.email_address FROM a JOIN b ON a.id = b.aid",
        )

    def test_dont_assume_attr_key_is_present_ac(self, _name_mismatch_fixture):
        A, B = _name_mismatch_fixture

        ac = aliased(A)
        self.assert_compile(
            select(ac, ac.some_email).join(ac.addresses),
            "SELECT a_1.id, b.email_address "
            "FROM a AS a_1 JOIN b ON a_1.id = b.aid",
        )

    def test_c_collection_func_element(self, _function_fixture):
        A = _function_fixture

        stmt = select(A.id, A.foo_value)
        eq_(stmt.subquery().c.keys(), ["id", "foo_value"])

    def test_filter_by_mismatched_col(self, _name_mismatch_fixture):
        A, B = _name_mismatch_fixture
        self.assert_compile(
            select(A).filter_by(some_email="foo").join(A.addresses),
            "SELECT a.id FROM a JOIN b ON a.id = b.aid "
            "WHERE b.email_address = :email_address_1",
        )

    def test_aliased_mismatched_col(self, _name_mismatch_fixture):
        A, B = _name_mismatch_fixture
        sess = fixture_session()

        # so what should this do ?   it's just a weird hybrid case
        self.assert_compile(
            sess.query(aliased(A).some_email),
            "SELECT b.email_address AS b_email_address FROM b",
        )

    def test_property(self):
        A = self._fixture()

        is_(A.value.property, A._value.property)

    def test_key(self):
        A = self._fixture()
        eq_(A.value.key, "value")
        eq_(A._value.key, "_value")

    def test_class(self):
        A = self._fixture()
        is_(A.value.class_, A._value.class_)

    def test_get_history(self):
        A = self._fixture()
        inst = A(_value=5)
        eq_(A.value.get_history(inst), A._value.get_history(inst))

    def test_info_not_mirrored(self):
        A = self._fixture()
        A._value.info["foo"] = "bar"
        A.value.info["bar"] = "hoho"

        eq_(A._value.info, {"foo": "bar"})
        eq_(A.value.info, {"bar": "hoho"})

    def test_info_from_hybrid(self):
        A = self._fixture()
        A._value.info["foo"] = "bar"
        A.value.info["bar"] = "hoho"

        insp = inspect(A)
        is_(insp.all_orm_descriptors["value"].info, A.value.info)


class SynonymOfPropertyTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_property
            def value(self):
                return self._value

            value_syn = synonym("value")

            @hybrid.hybrid_property
            def string_value(self):
                return "foo"

            string_value_syn = synonym("string_value")

            @hybrid.hybrid_property
            def string_expr_value(self):
                return "foo"

            @string_expr_value.expression
            def string_expr_value(cls):
                return literal_column("'foo'")

            string_expr_value_syn = synonym("string_expr_value")

        return A

    def test_hasattr(self):
        A = self._fixture()

        is_false(hasattr(A.value_syn, "nonexistent"))

        is_false(hasattr(A.string_value_syn, "nonexistent"))

        is_false(hasattr(A.string_expr_value_syn, "nonexistent"))

    def test_instance_access(self):
        A = self._fixture()

        a1 = A(_value="hi")

        eq_(a1.value_syn, "hi")

        eq_(a1.string_value_syn, "foo")

        eq_(a1.string_expr_value_syn, "foo")

    def test_expression_property(self):
        A = self._fixture()

        self.assert_compile(
            select(A.id, A.value_syn).where(A.value_syn == "value"),
            "SELECT a.id, a.value FROM a WHERE a.value = :value_1",
        )

    def test_expression_expr(self):
        A = self._fixture()

        self.assert_compile(
            select(A.id, A.string_expr_value_syn).where(
                A.string_expr_value_syn == "value"
            ),
            "SELECT a.id, 'foo' FROM a WHERE 'foo' = :'foo'_1",
        )


class MethodExpressionTest(fixtures.TestBase, AssertsCompiledSQL):
    __dialect__ = "default"

    def _fixture(self):
        Base = declarative_base()

        class A(Base):
            __tablename__ = "a"
            id = Column(Integer, primary_key=True)
            _value = Column("value", String)

            @hybrid.hybrid_method
            def value(self, x):
                "This is an instance-level docstring"
                return int(self._value) + x

            @value.expression
            def value(cls, value):
                "This is a class-level docstring"
                return func.foo(cls._value, value) + value

            @hybrid.hybrid_method
            def other_value(self, x):
                "This is an instance-level docstring"
                return int(self._value) + x

            @other_value.expression
            def other_value(cls, value):
                return func.foo(cls._value, value) + value

        return A

    def test_call(self):
        A = self._fixture()
        a1 = A(_value=10)
        eq_(a1.value(7), 17)

    def test_expression(self):
        A = self._fixture()
        self.assert_compile(A.value(5), "foo(a.value, :foo_1) + :foo_2")

    def test_info(self):
        A = self._fixture()
        inspect(A).all_orm_descriptors.value.info["some key"] = "some value"
        eq_(
            inspect(A).all_orm_descriptors.value.info,
            {"some key": "some value"},
        )

    def test_aliased_expression(self):
        A = self._fixture()
        self.assert_compile(
            aliased(A).value(5), "foo(a_1.value, :foo_1) + :foo_2"
        )

    def test_query(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(A).filter(A.value(5) == "foo"),
            "SELECT a.value AS a_value, a.id AS a_id "
            "FROM a WHERE foo(a.value, :foo_1) + :foo_2 = :param_1",
        )

    def test_aliased_query(self):
        A = self._fixture()
        sess = fixture_session()
        a1 = aliased(A)
        self.assert_compile(
            sess.query(a1).filter(a1.value(5) == "foo"),
            "SELECT a_1.value AS a_1_value, a_1.id AS a_1_id "
            "FROM a AS a_1 WHERE foo(a_1.value, :foo_1) + :foo_2 = :param_1",
        )

    def test_query_col(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(A.value(5)),
            "SELECT foo(a.value, :foo_1) + :foo_2 AS anon_1 FROM a",
        )

    def test_aliased_query_col(self):
        A = self._fixture()
        sess = fixture_session()
        self.assert_compile(
            sess.query(aliased(A).value(5)),
            "SELECT foo(a_1.value, :foo_1) + :foo_2 AS anon_1 FROM a AS a_1",
        )

    def test_docstring(self):
        A = self._fixture()
        eq_(A.value.__doc__, "This is a class-level docstring")
        eq_(A.other_value.__doc__, "This is an instance-level docstring")
        a1 = A(_value=10)

        # a1.value is still a method, so it has a
        # docstring
        eq_(a1.value.__doc__, "This is an instance-level docstring")

        eq_(a1.other_value.__doc__, "This is an instance-level docstring")


class BulkUpdateTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL):
    __dialect__ = "default"

    @classmethod
    def setup_classes(cls):
        Base = cls.DeclarativeBasic

        class Person(Base):
            __tablename__ = "person"

            id = Column(Integer, primary_key=True)
            first_name = Column(String(10))
            last_name = Column(String(10))

            @hybrid.hybrid_property
            def name(self):
                return self.first_name + " " + self.last_name

            @name.setter
            def name(self, value):
                self.first_name, self.last_name = value.split(" ", 1)

            @name.expression
            def name(cls):
                return func.concat(cls.first_name, " ", cls.last_name)

            @name.update_expression
            def name(cls, value):
                f, l = value.split(" ", 1)
                return [(cls.first_name, f), (cls.last_name, l)]

            @hybrid.hybrid_property
            def uname(self):
                return self.name

            @hybrid.hybrid_property
            def fname(self):
                return self.first_name

            @hybrid.hybrid_property
            def fname2(self):
                return self.fname

    @classmethod
    def insert_data(cls, connection):
        s = Session(connection)
        jill = cls.classes.Person(id=3, first_name="jill")
        s.add(jill)
        s.commit()

    def test_update_plain(self):
        Person = self.classes.Person

        statement = update(Person).values({Person.fname: "Dr."})

        self.assert_compile(
            statement,
            "UPDATE person SET first_name=:first_name",
            params={"first_name": "Dr."},
        )

    def test_update_expr(self):
        Person = self.classes.Person

        statement = update(Person).values({Person.name: "Dr. No"})

        self.assert_compile(
            statement,
            "UPDATE person SET first_name=:first_name, last_name=:last_name",
            params={"first_name": "Dr.", "last_name": "No"},
        )

    # these tests all run two UPDATES to assert that caching is not
    # interfering.  this is #7209

    def test_evaluate_non_hybrid_attr(self):
        # this is a control case
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.first_name: "moonbeam"}, synchronize_session="evaluate"
        )
        eq_(jill.first_name, "moonbeam")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.first_name: "sunshine"}, synchronize_session="evaluate"
        )
        eq_(jill.first_name, "sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "sunshine",
        )

    def test_evaluate_hybrid_attr_indirect(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.fname2: "moonbeam"}, synchronize_session="evaluate"
        )
        eq_(jill.fname2, "moonbeam")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.fname2: "sunshine"}, synchronize_session="evaluate"
        )
        eq_(jill.fname2, "sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "sunshine",
        )

    def test_evaluate_hybrid_attr_plain(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.fname: "moonbeam"}, synchronize_session="evaluate"
        )
        eq_(jill.fname, "moonbeam")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.fname: "sunshine"}, synchronize_session="evaluate"
        )
        eq_(jill.fname, "sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "sunshine",
        )

    def test_fetch_hybrid_attr_indirect(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.fname2: "moonbeam"}, synchronize_session="fetch"
        )
        eq_(jill.fname2, "moonbeam")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.fname2: "sunshine"}, synchronize_session="fetch"
        )
        eq_(jill.fname2, "sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "sunshine",
        )

    def test_fetch_hybrid_attr_plain(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.fname: "moonbeam"}, synchronize_session="fetch"
        )
        eq_(jill.fname, "moonbeam")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.fname: "sunshine"}, synchronize_session="fetch"
        )
        eq_(jill.fname, "sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "sunshine",
        )

    def test_evaluate_hybrid_attr_w_update_expr(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.name: "moonbeam sunshine"}, synchronize_session="evaluate"
        )
        eq_(jill.name, "moonbeam sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.name: "first last"}, synchronize_session="evaluate"
        )
        eq_(jill.name, "first last")
        eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first")

    def test_fetch_hybrid_attr_w_update_expr(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.name: "moonbeam sunshine"}, synchronize_session="fetch"
        )
        eq_(jill.name, "moonbeam sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.name: "first last"}, synchronize_session="fetch"
        )
        eq_(jill.name, "first last")
        eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first")

    def test_evaluate_hybrid_attr_indirect_w_update_expr(self):
        Person = self.classes.Person

        s = fixture_session()
        jill = s.get(Person, 3)

        s.query(Person).update(
            {Person.uname: "moonbeam sunshine"}, synchronize_session="evaluate"
        )
        eq_(jill.uname, "moonbeam sunshine")
        eq_(
            s.scalar(select(Person.first_name).where(Person.id == 3)),
            "moonbeam",
        )

        s.query(Person).update(
            {Person.uname: "first last"}, synchronize_session="evaluate"
        )
        eq_(jill.uname, "first last")
        eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first")


class SpecialObjectTest(fixtures.TestBase, AssertsCompiledSQL):
    """tests against hybrids that return a non-ClauseElement.

    use cases derived from the example at
    https://techspot.zzzeek.org/2011/10/21/hybrids-and-value-agnostic-types/

    """

    __dialect__ = "default"

    @classmethod
    def setup_test_class(cls):
        from sqlalchemy import literal

        symbols = ("usd", "gbp", "cad", "eur", "aud")
        currency_lookup = dict(
            ((currency_from, currency_to), Decimal(str(rate)))
            for currency_to, values in zip(
                symbols,
                [
                    (1, 1.59009, 0.988611, 1.37979, 1.02962),
                    (0.628895, 1, 0.621732, 0.867748, 0.647525),
                    (1.01152, 1.6084, 1, 1.39569, 1.04148),
                    (0.724743, 1.1524, 0.716489, 1, 0.746213),
                    (0.971228, 1.54434, 0.960166, 1.34009, 1),
                ],
            )
            for currency_from, rate in zip(symbols, values)
        )

        class Amount(object):
            def __init__(self, amount, currency):
                self.currency = currency
                self.amount = amount

            def __add__(self, other):
                return Amount(
                    self.amount + other.as_currency(self.currency).amount,
                    self.currency,
                )

            def __sub__(self, other):
                return Amount(
                    self.amount - other.as_currency(self.currency).amount,
                    self.currency,
                )

            def __lt__(self, other):
                return self.amount < other.as_currency(self.currency).amount

            def __gt__(self, other):
                return self.amount > other.as_currency(self.currency).amount

            def __eq__(self, other):
                return self.amount == other.as_currency(self.currency).amount

            def as_currency(self, other_currency):
                return Amount(
                    currency_lookup[(self.currency, other_currency)]
                    * self.amount,
                    other_currency,
                )

            def __clause_element__(self):
                # helper method for SQLAlchemy to interpret
                # the Amount object as a SQL element
                if isinstance(self.amount, (float, int, Decimal)):
                    return literal(self.amount)
                else:
                    return self.amount

            def __str__(self):
                return "%2.4f %s" % (self.amount, self.currency)

            def __repr__(self):
                return "Amount(%r, %r)" % (self.amount, self.currency)

        Base = declarative_base()

        class BankAccount(Base):
            __tablename__ = "bank_account"
            id = Column(Integer, primary_key=True)

            _balance = Column("balance", Numeric)

            @hybrid.hybrid_property
            def balance(self):
                """Return an Amount view of the current balance."""
                return Amount(self._balance, "usd")

            @balance.setter
            def balance(self, value):
                self._balance = value.as_currency("usd").amount

        cls.Amount = Amount
        cls.BankAccount = BankAccount

    def test_instance_one(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        account = BankAccount(balance=Amount(4000, "usd"))

        # 3b. print balance in usd
        eq_(account.balance.amount, 4000)

    def test_instance_two(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        account = BankAccount(balance=Amount(4000, "usd"))

        # 3c. print balance in gbp
        eq_(account.balance.as_currency("gbp").amount, Decimal("2515.58"))

    def test_instance_three(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        account = BankAccount(balance=Amount(4000, "usd"))

        # 3d. perform currency-agnostic comparisons, math
        is_(account.balance > Amount(500, "cad"), True)

    def test_instance_four(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        account = BankAccount(balance=Amount(4000, "usd"))
        eq_(
            account.balance + Amount(500, "cad") - Amount(50, "eur"),
            Amount(Decimal("4425.316"), "usd"),
        )

    def test_query_one(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        session = fixture_session()

        query = session.query(BankAccount).filter(
            BankAccount.balance == Amount(10000, "cad")
        )

        self.assert_compile(
            query,
            "SELECT bank_account.balance AS bank_account_balance, "
            "bank_account.id AS bank_account_id FROM bank_account "
            "WHERE bank_account.balance = :balance_1",
            checkparams={"balance_1": Decimal("9886.110000")},
        )

    def test_query_two(self):
        BankAccount, Amount = self.BankAccount, self.Amount
        session = fixture_session()

        # alternatively we can do the calc on the DB side.
        query = (
            session.query(BankAccount)
            .filter(
                BankAccount.balance.as_currency("cad") > Amount(9999, "cad")
            )
            .filter(
                BankAccount.balance.as_currency("cad") < Amount(10001, "cad")
            )
        )
        self.assert_compile(
            query,
            "SELECT bank_account.balance AS bank_account_balance, "
            "bank_account.id AS bank_account_id "
            "FROM bank_account "
            "WHERE :balance_1 * bank_account.balance > :param_1 "
            "AND :balance_2 * bank_account.balance < :param_2",
            checkparams={
                "balance_1": Decimal("1.01152"),
                "balance_2": Decimal("1.01152"),
                "param_1": Decimal("9999"),
                "param_2": Decimal("10001"),
            },
        )

    def test_query_three(self):
        BankAccount = self.BankAccount
        session = fixture_session()

        query = session.query(BankAccount).filter(
            BankAccount.balance.as_currency("cad")
            > BankAccount.balance.as_currency("eur")
        )
        self.assert_compile(
            query,
            "SELECT bank_account.balance AS bank_account_balance, "
            "bank_account.id AS bank_account_id FROM bank_account "
            "WHERE :balance_1 * bank_account.balance > "
            ":param_1 * :balance_2 * bank_account.balance",
            checkparams={
                "balance_1": Decimal("1.01152"),
                "balance_2": Decimal("0.724743"),
                "param_1": Decimal("1.39569"),
            },
        )

    def test_query_four(self):
        BankAccount = self.BankAccount
        session = fixture_session()

        # 4c. query all amounts, converting to "CAD" on the DB side
        query = session.query(BankAccount.balance.as_currency("cad").amount)
        self.assert_compile(
            query,
            "SELECT :balance_1 * bank_account.balance AS anon_1 "
            "FROM bank_account",
            checkparams={"balance_1": Decimal("1.01152")},
        )

    def test_query_five(self):
        BankAccount = self.BankAccount
        session = fixture_session()

        # 4d. average balance in EUR
        query = session.query(func.avg(BankAccount.balance.as_currency("eur")))
        self.assert_compile(
            query,
            "SELECT avg(:balance_1 * bank_account.balance) AS avg_1 "
            "FROM bank_account",
            checkparams={"balance_1": Decimal("0.724743")},
        )

    def test_docstring(self):
        BankAccount = self.BankAccount
        eq_(
            BankAccount.balance.__doc__,
            "Return an Amount view of the current balance.",
        )
