# coding: utf-8
"""Tests unitofwork operations."""

from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
import datetime
from sqlalchemy.orm import mapper as orm_mapper

import sqlalchemy as sa
from sqlalchemy.util import u, ue, b
from sqlalchemy import Integer, String, ForeignKey, \
    literal_column, event, Boolean
from sqlalchemy.testing import engines
from sqlalchemy import testing
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.schema import Column
from sqlalchemy.orm import mapper, relationship, create_session, \
    column_property, Session, exc as orm_exc
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
from sqlalchemy.testing.assertsql import AllOf, CompiledSQL


class UnitOfWorkTest(object):
    pass

class HistoryTest(_fixtures.FixtureTest):
    run_inserts = None

    @classmethod
    def setup_classes(cls):
        class User(cls.Comparable):
            pass
        class Address(cls.Comparable):
            pass

    def test_backref(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        am = mapper(Address, addresses)
        m = mapper(User, users, properties=dict(
            addresses = relationship(am, backref='user', lazy='joined')))

        session = create_session(autocommit=False)

        u = User(name='u1')
        a = Address(email_address='u1@e')
        a.user = u
        session.add(u)

        eq_(u.addresses, [a])
        session.commit()
        session.expunge_all()

        u = session.query(m).one()
        assert u.addresses[0].user == u
        session.close()

class UnicodeTest(fixtures.MappedTest):
    __requires__ = ('unicode_connections',)

    @classmethod
    def define_tables(cls, metadata):
        uni_type = sa.Unicode(50).with_variant(
            sa.Unicode(50, collation="utf8_unicode_ci"), "mysql")

        Table('uni_t1', metadata,
            Column('id',  Integer, primary_key=True,
                   test_needs_autoincrement=True),
            Column('txt', uni_type, unique=True))
        Table('uni_t2', metadata,
            Column('id',  Integer, primary_key=True,
                   test_needs_autoincrement=True),
            Column('txt', uni_type, ForeignKey('uni_t1')))

    @classmethod
    def setup_classes(cls):
        class Test(cls.Basic):
            pass
        class Test2(cls.Basic):
            pass

    def test_basic(self):
        Test, uni_t1 = self.classes.Test, self.tables.uni_t1

        mapper(Test, uni_t1)

        txt = ue("\u0160\u0110\u0106\u010c\u017d")
        t1 = Test(id=1, txt=txt)
        self.assert_(t1.txt == txt)

        session = create_session(autocommit=False)
        session.add(t1)
        session.commit()

        self.assert_(t1.txt == txt)

    def test_relationship(self):
        Test, uni_t2, uni_t1, Test2 = (self.classes.Test,
                                self.tables.uni_t2,
                                self.tables.uni_t1,
                                self.classes.Test2)

        mapper(Test, uni_t1, properties={
            't2s': relationship(Test2)})
        mapper(Test2, uni_t2)

        txt = ue("\u0160\u0110\u0106\u010c\u017d")
        t1 = Test(txt=txt)
        t1.t2s.append(Test2())
        t1.t2s.append(Test2())
        session = create_session(autocommit=False)
        session.add(t1)
        session.commit()
        session.close()

        session = create_session()
        t1 = session.query(Test).filter_by(id=t1.id).one()
        assert len(t1.t2s) == 2

class UnicodeSchemaTest(fixtures.MappedTest):
    __requires__ = ('unicode_connections', 'unicode_ddl',)

    run_dispose_bind = 'once'

    @classmethod
    def define_tables(cls, metadata):
        t1 = Table('unitable1', metadata,
              Column(u('méil'), Integer, primary_key=True, key='a', test_needs_autoincrement=True),
              Column(ue('\u6e2c\u8a66'), Integer, key='b'),
              Column('type',  String(20)),
              test_needs_fk=True,
              test_needs_autoincrement=True)
        t2 = Table(u('Unitéble2'), metadata,
              Column(u('méil'), Integer, primary_key=True, key="cc", test_needs_autoincrement=True),
              Column(ue('\u6e2c\u8a66'), Integer,
                     ForeignKey('unitable1.a'), key="d"),
              Column(ue('\u6e2c\u8a66_2'), Integer, key="e"),
              test_needs_fk=True,
              test_needs_autoincrement=True)

        cls.tables['t1'] = t1
        cls.tables['t2'] = t2

    @classmethod
    def setup_class(cls):
        super(UnicodeSchemaTest, cls).setup_class()

    @classmethod
    def teardown_class(cls):
        super(UnicodeSchemaTest, cls).teardown_class()

    @testing.fails_on('mssql+pyodbc',
                      'pyodbc returns a non unicode encoding of the results description.')
    def test_mapping(self):
        t2, t1 = self.tables.t2, self.tables.t1

        class A(fixtures.ComparableEntity):
            pass
        class B(fixtures.ComparableEntity):
            pass

        mapper(A, t1, properties={
            't2s':relationship(B)})
        mapper(B, t2)

        a1 = A()
        b1 = B()
        a1.t2s.append(b1)

        session = create_session()
        session.add(a1)
        session.flush()
        session.expunge_all()

        new_a1 = session.query(A).filter(t1.c.a == a1.a).one()
        assert new_a1.a == a1.a
        assert new_a1.t2s[0].d == b1.d
        session.expunge_all()

        new_a1 = (session.query(A).options(sa.orm.joinedload('t2s')).
                  filter(t1.c.a == a1.a)).one()
        assert new_a1.a == a1.a
        assert new_a1.t2s[0].d == b1.d
        session.expunge_all()

        new_a1 = session.query(A).filter(A.a == a1.a).one()
        assert new_a1.a == a1.a
        assert new_a1.t2s[0].d == b1.d
        session.expunge_all()

    @testing.fails_on('mssql+pyodbc',
                      'pyodbc returns a non unicode encoding of the results description.')
    def test_inheritance_mapping(self):
        t2, t1 = self.tables.t2, self.tables.t1

        class A(fixtures.ComparableEntity):
            pass
        class B(A):
            pass

        mapper(A, t1,
               polymorphic_on=t1.c.type,
               polymorphic_identity='a')
        mapper(B, t2,
               inherits=A,
               polymorphic_identity='b')
        a1 = A(b=5)
        b1 = B(e=7)

        session = create_session()
        session.add_all((a1, b1))
        session.flush()
        session.expunge_all()

        eq_([A(b=5), B(e=7)], session.query(A).all())

class BinaryHistTest(fixtures.MappedTest, testing.AssertsExecutionResults):
    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('id', sa.Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', sa.LargeBinary),
        )

    @classmethod
    def setup_classes(cls):
        class Foo(cls.Basic):
            pass

    def test_binary_equality(self):
        Foo, t1 = self.classes.Foo, self.tables.t1

        data = b("this is some data")

        mapper(Foo, t1)

        s = create_session()

        f1 = Foo(data=data)
        s.add(f1)
        s.flush()
        s.expire_all()
        f1 = s.query(Foo).first()
        assert f1.data == data
        f1.data = data
        eq_(
            sa.orm.attributes.get_history(f1, "data"),
            ((), [data], ())
        )
        def go():
            s.flush()
        self.assert_sql_count(testing.db, go, 0)

class PKTest(fixtures.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('multipk1', metadata,
              Column('multi_id', Integer, primary_key=True,
                     test_needs_autoincrement=True),
              Column('multi_rev', Integer, primary_key=True),
              Column('name', String(50), nullable=False),
              Column('value', String(100)))

        Table('multipk2', metadata,
              Column('pk_col_1', String(30), primary_key=True),
              Column('pk_col_2', String(30), primary_key=True),
              Column('data', String(30)))
        Table('multipk3', metadata,
              Column('pri_code', String(30), key='primary', primary_key=True),
              Column('sec_code', String(30), key='secondary', primary_key=True),
              Column('date_assigned', sa.Date, key='assigned', primary_key=True),
              Column('data', String(30)))

    @classmethod
    def setup_classes(cls):
        class Entry(cls.Basic):
            pass

    # not supported on sqlite since sqlite's auto-pk generation only works with
    # single column primary keys
    @testing.fails_on('sqlite', 'FIXME: unknown')
    def test_primary_key(self):
        Entry, multipk1 = self.classes.Entry, self.tables.multipk1

        mapper(Entry, multipk1)

        e = Entry(name='entry1', value='this is entry 1', multi_rev=2)

        session = create_session()
        session.add(e)
        session.flush()
        session.expunge_all()

        e2 = session.query(Entry).get((e.multi_id, 2))
        self.assert_(e is not e2)
        state = sa.orm.attributes.instance_state(e)
        state2 = sa.orm.attributes.instance_state(e2)
        eq_(state.key, state2.key)

    # this one works with sqlite since we are manually setting up pk values
    def test_manual_pk(self):
        Entry, multipk2 = self.classes.Entry, self.tables.multipk2

        mapper(Entry, multipk2)

        e = Entry(pk_col_1='pk1', pk_col_2='pk1_related', data='im the data')

        session = create_session()
        session.add(e)
        session.flush()

    def test_key_pks(self):
        Entry, multipk3 = self.classes.Entry, self.tables.multipk3

        mapper(Entry, multipk3)

        e = Entry(primary= 'pk1', secondary='pk2',
                   assigned=datetime.date.today(), data='some more data')

        session = create_session()
        session.add(e)
        session.flush()


class ForeignPKTest(fixtures.MappedTest):
    """Detection of the relationship direction on PK joins."""

    @classmethod
    def define_tables(cls, metadata):
        Table("people", metadata,
              Column('person', String(10), primary_key=True),
              Column('firstname', String(10)),
              Column('lastname', String(10)))

        Table("peoplesites", metadata,
              Column('person', String(10), ForeignKey("people.person"),
                     primary_key=True),
              Column('site', String(10)))

    @classmethod
    def setup_classes(cls):
        class Person(cls.Basic):
            pass
        class PersonSite(cls.Basic):
            pass

    def test_basic(self):
        peoplesites, PersonSite, Person, people = (self.tables.peoplesites,
                                self.classes.PersonSite,
                                self.classes.Person,
                                self.tables.people)

        m1 = mapper(PersonSite, peoplesites)
        m2 = mapper(Person, people, properties={
            'sites' : relationship(PersonSite)})

        sa.orm.configure_mappers()
        eq_(list(m2.get_property('sites').synchronize_pairs),
            [(people.c.person, peoplesites.c.person)])

        p = Person(person='im the key', firstname='asdf')
        ps = PersonSite(site='asdf')
        p.sites.append(ps)

        session = create_session()
        session.add(p)
        session.flush()

        p_count = people.count(people.c.person=='im the key').scalar()
        eq_(p_count, 1)
        eq_(peoplesites.count(peoplesites.c.person=='im the key').scalar(), 1)


class ClauseAttributesTest(fixtures.MappedTest):

    @classmethod
    def define_tables(cls, metadata):
        Table('users_t', metadata,
            Column('id', Integer, primary_key=True,
                   test_needs_autoincrement=True),
            Column('name', String(30)),
            Column('counter', Integer, default=1))

        Table('boolean_t', metadata,
            Column('id', Integer, primary_key=True,
                   test_needs_autoincrement=True),
            Column('value', Boolean),
            )

    @classmethod
    def setup_classes(cls):
        class User(cls.Comparable):
            pass

        class HasBoolean(cls.Comparable):
            pass

    @classmethod
    def setup_mappers(cls):
        User, users_t = cls.classes.User, cls.tables.users_t
        HasBoolean, boolean_t = cls.classes.HasBoolean, cls.tables.boolean_t
        mapper(User, users_t)
        mapper(HasBoolean, boolean_t)

    def test_update(self):
        User = self.classes.User

        u = User(name='test')

        session = create_session()
        session.add(u)
        session.flush()

        eq_(u.counter, 1)
        u.counter = User.counter + 1
        session.flush()

        def go():
            assert (u.counter == 2) is True  # ensure its not a ClauseElement
        self.sql_count_(1, go)

    def test_multi_update(self):
        User = self.classes.User

        u = User(name='test')

        session = create_session()
        session.add(u)
        session.flush()

        eq_(u.counter, 1)
        u.name = 'test2'
        u.counter = User.counter + 1
        session.flush()

        def go():
            eq_(u.name, 'test2')
            assert (u.counter == 2) is True
        self.sql_count_(1, go)

        session.expunge_all()
        u = session.query(User).get(u.id)
        eq_(u.name, 'test2')
        eq_(u.counter,  2)

    def test_insert(self):
        User = self.classes.User

        u = User(name='test', counter=sa.select([5]))

        session = create_session()
        session.add(u)
        session.flush()

        assert (u.counter == 5) is True

    def test_update_special_comparator(self):
        HasBoolean = self.classes.HasBoolean

        # make sure the comparison we're shooting
        # for is invalid, otherwise we need to
        # test something else here
        assert_raises_message(
            TypeError,
            "Boolean value of this clause is not defined",
            bool, None == sa.false()
        )
        s = create_session()
        hb = HasBoolean(value=None)
        s.add(hb)
        s.flush()

        hb.value = sa.false()

        s.flush()

        # needs to be refreshed
        assert 'value' not in hb.__dict__
        eq_(hb.value, False)


class PassiveDeletesTest(fixtures.MappedTest):
    __requires__ = ('foreign_keys',)

    @classmethod
    def define_tables(cls, metadata):
        Table('mytable', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(30)),
              test_needs_fk=True)

        Table('myothertable', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('parent_id', Integer),
              Column('data', String(30)),
              sa.ForeignKeyConstraint(['parent_id'],
                                      ['mytable.id'],
                                      ondelete="CASCADE"),
              test_needs_fk=True)

    @classmethod
    def setup_classes(cls):
        class MyClass(cls.Basic):
            pass
        class MyOtherClass(cls.Basic):
            pass

    def test_basic(self):
        myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable)
        mapper(MyClass, mytable, properties={
            'children':relationship(MyOtherClass,
                                passive_deletes=True,
                                cascade="all")})
        session = create_session()
        mc = MyClass()
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())

        session.add(mc)
        session.flush()
        session.expunge_all()

        assert myothertable.count().scalar() == 4
        mc = session.query(MyClass).get(mc.id)
        session.delete(mc)
        session.flush()

        assert mytable.count().scalar() == 0
        assert myothertable.count().scalar() == 0

    @testing.emits_warning(r".*'passive_deletes' is normally configured on one-to-many")
    def test_backwards_pd(self):
        """Test that passive_deletes=True disables a delete from an m2o.

        This is not the usual usage and it now raises a warning, but test
        that it works nonetheless.

        """

        myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable, properties={
            'myclass':relationship(MyClass, cascade="all, delete", passive_deletes=True)
        })
        mapper(MyClass, mytable)

        session = create_session()
        mc = MyClass()
        mco = MyOtherClass()
        mco.myclass = mc
        session.add(mco)
        session.flush()

        assert mytable.count().scalar() == 1
        assert myothertable.count().scalar() == 1

        session.expire(mco, ['myclass'])
        session.delete(mco)
        session.flush()

        # mytable wasn't deleted, is the point.
        assert mytable.count().scalar() == 1
        assert myothertable.count().scalar() == 0

    def test_aaa_m2o_emits_warning(self):
        myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable, properties={
            'myclass':relationship(MyClass, cascade="all, delete", passive_deletes=True)
        })
        mapper(MyClass, mytable)
        assert_raises(sa.exc.SAWarning, sa.orm.configure_mappers)

class BatchDeleteIgnoresRowcountTest(fixtures.DeclarativeMappedTest):
    __requires__ = ('foreign_keys',)
    @classmethod
    def setup_classes(cls):
        class A(cls.DeclarativeBasic):
            __tablename__ = 'A'
            __table_args__ = dict(test_needs_fk=True)
            __mapper_args__ = {"confirm_deleted_rows": False}
            id = Column(Integer, primary_key=True)
            parent_id = Column(Integer, ForeignKey('A.id', ondelete='CASCADE'))

    def test_delete_both(self):
        A = self.classes.A
        session = Session(testing.db)

        a1, a2 = A(id=1), A(id=2, parent_id=1)

        session.add_all([a1, a2])
        session.flush()

        session.delete(a1)
        session.delete(a2)

        # no issue with multi-row count here
        session.flush()

class ExtraPassiveDeletesTest(fixtures.MappedTest):
    __requires__ = ('foreign_keys',)

    @classmethod
    def define_tables(cls, metadata):
        Table('mytable', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('data', String(30)),
              test_needs_fk=True)

        Table('myothertable', metadata,
              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
              Column('parent_id', Integer),
              Column('data', String(30)),
              # no CASCADE, the same as ON DELETE RESTRICT
              sa.ForeignKeyConstraint(['parent_id'],
                                      ['mytable.id']),
              test_needs_fk=True)

    @classmethod
    def setup_classes(cls):
        class MyClass(cls.Basic):
            pass
        class MyOtherClass(cls.Basic):
            pass

    def test_assertions(self):
        myothertable, MyOtherClass = self.tables.myothertable, self.classes.MyOtherClass
        mytable, MyClass = self.tables.mytable, self.classes.MyClass

        mapper(MyClass, mytable, properties={
            'foo': relationship(MyOtherClass,
                                    passive_deletes='all',
                                    cascade="all")
            })
        mapper(MyOtherClass, myothertable)

        assert_raises_message(
            sa.exc.ArgumentError,
            "On MyClass.foo, can't set passive_deletes='all' in conjunction with 'delete' "
            "or 'delete-orphan' cascade",
            sa.orm.configure_mappers
        )

    def test_extra_passive(self):
        myothertable, MyClass, MyOtherClass, mytable = (
                                self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable)
        mapper(MyClass, mytable, properties={
            'children': relationship(MyOtherClass,
                                 passive_deletes='all',
                                 cascade="save-update")})

        session = create_session()
        mc = MyClass()
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())
        mc.children.append(MyOtherClass())
        session.add(mc)
        session.flush()
        session.expunge_all()

        assert myothertable.count().scalar() == 4
        mc = session.query(MyClass).get(mc.id)
        session.delete(mc)
        assert_raises(sa.exc.DBAPIError, session.flush)

    def test_extra_passive_2(self):
        myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable)
        mapper(MyClass, mytable, properties={
            'children': relationship(MyOtherClass,
                                 passive_deletes='all',
                                 cascade="save-update")})

        session = create_session()
        mc = MyClass()
        mc.children.append(MyOtherClass())
        session.add(mc)
        session.flush()
        session.expunge_all()

        assert myothertable.count().scalar() == 1

        mc = session.query(MyClass).get(mc.id)
        session.delete(mc)
        mc.children[0].data = 'some new data'
        assert_raises(sa.exc.DBAPIError, session.flush)

    def test_dont_emit(self):
        myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
                                self.classes.MyClass,
                                self.classes.MyOtherClass,
                                self.tables.mytable)

        mapper(MyOtherClass, myothertable)
        mapper(MyClass, mytable, properties={
            'children': relationship(MyOtherClass,
                                 passive_deletes='all',
                                 cascade="save-update")})
        session = Session()
        mc = MyClass()
        session.add(mc)
        session.commit()
        mc.id

        session.delete(mc)

        # no load for "children" should occur
        self.assert_sql_count(testing.db, session.flush, 1)

class ColumnCollisionTest(fixtures.MappedTest):
    """Ensure the mapper doesn't break bind param naming rules on flush."""

    @classmethod
    def define_tables(cls, metadata):
        Table('book', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('book_id', String(50)),
            Column('title', String(50))
        )

    def test_naming(self):
        book = self.tables.book

        class Book(fixtures.ComparableEntity):
            pass

        mapper(Book, book)
        sess = create_session()

        b1 = Book(book_id='abc', title='def')
        sess.add(b1)
        sess.flush()

        b1.title = 'ghi'
        sess.flush()
        sess.close()
        eq_(
            sess.query(Book).first(),
            Book(book_id='abc', title='ghi')
        )



class DefaultTest(fixtures.MappedTest):
    """Exercise mappings on columns with DefaultGenerators.

    Tests that when saving objects whose table contains DefaultGenerators,
    either python-side, preexec or database-side, the newly saved instances
    receive all the default values either through a post-fetch or getting the
    pre-exec'ed defaults back from the engine.

    """

    @classmethod
    def define_tables(cls, metadata):
        use_string_defaults = testing.against('postgresql', 'oracle', 'sqlite', 'mssql')

        if use_string_defaults:
            hohotype = String(30)
            hohoval = "im hoho"
            althohoval = "im different hoho"
        else:
            hohotype = Integer
            hohoval = 9
            althohoval = 15

        cls.other['hohoval'] = hohoval
        cls.other['althohoval'] = althohoval

        dt = Table('default_t', metadata,
            Column('id', Integer, primary_key=True,
                   test_needs_autoincrement=True),
            Column('hoho', hohotype, server_default=str(hohoval)),
            Column('counter', Integer, default=sa.func.char_length("1234567", type_=Integer)),
            Column('foober', String(30), default="im foober", onupdate="im the update"),
            mysql_engine='MyISAM')

        st = Table('secondary_table', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(50)),
            mysql_engine='MyISAM')

        if testing.against('postgresql', 'oracle'):
            dt.append_column(
                Column('secondary_id', Integer, sa.Sequence('sec_id_seq'),
                       unique=True))
            st.append_column(
                Column('fk_val', Integer,
                       ForeignKey('default_t.secondary_id')))
        elif testing.against('mssql'):
            st.append_column(
                Column('fk_val', Integer,
                       ForeignKey('default_t.id')))
        else:
            st.append_column(
                Column('hoho', hohotype, ForeignKey('default_t.hoho')))

    @classmethod
    def setup_classes(cls):
        class Hoho(cls.Comparable):
            pass
        class Secondary(cls.Comparable):
            pass

    @testing.fails_on('firebird', 'Data type unknown on the parameter')
    def test_insert(self):
        althohoval, hohoval, default_t, Hoho = (self.other.althohoval,
                                self.other.hohoval,
                                self.tables.default_t,
                                self.classes.Hoho)

        mapper(Hoho, default_t)

        h1 = Hoho(hoho=althohoval)
        h2 = Hoho(counter=12)
        h3 = Hoho(hoho=althohoval, counter=12)
        h4 = Hoho()
        h5 = Hoho(foober='im the new foober')

        session = create_session(autocommit=False)
        session.add_all((h1, h2, h3, h4, h5))
        session.commit()

        eq_(h1.hoho, althohoval)
        eq_(h3.hoho, althohoval)

        def go():
            # test deferred load of attribues, one select per instance
            self.assert_(h2.hoho == h4.hoho == h5.hoho == hohoval)
        self.sql_count_(3, go)

        def go():
            self.assert_(h1.counter == h4.counter == h5.counter == 7)
        self.sql_count_(1, go)

        def go():
            self.assert_(h3.counter == h2.counter == 12)
            self.assert_(h2.foober == h3.foober == h4.foober == 'im foober')
            self.assert_(h5.foober == 'im the new foober')
        self.sql_count_(0, go)

        session.expunge_all()

        (h1, h2, h3, h4, h5) = session.query(Hoho).order_by(Hoho.id).all()

        eq_(h1.hoho, althohoval)
        eq_(h3.hoho, althohoval)
        self.assert_(h2.hoho == h4.hoho == h5.hoho == hohoval)
        self.assert_(h3.counter == h2.counter == 12)
        self.assert_(h1.counter ==  h4.counter == h5.counter == 7)
        self.assert_(h2.foober == h3.foober == h4.foober == 'im foober')
        eq_(h5.foober, 'im the new foober')

    @testing.fails_on('firebird', 'Data type unknown on the parameter')
    @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
    def test_eager_defaults(self):
        hohoval, default_t, Hoho = (self.other.hohoval,
                                self.tables.default_t,
                                self.classes.Hoho)
        Secondary = self.classes.Secondary

        mapper(Hoho, default_t, eager_defaults=True, properties={
                "sec": relationship(Secondary),
                "syn": sa.orm.synonym(default_t.c.counter)
            })


        mapper(Secondary, self.tables.secondary_table)
        h1 = Hoho()

        session = create_session()
        session.add(h1)

        if testing.db.dialect.implicit_returning:
            self.sql_count_(1, session.flush)
        else:
            self.sql_count_(2, session.flush)

        self.sql_count_(0, lambda: eq_(h1.hoho, hohoval))

        # no actual eager defaults, make sure error isn't raised
        h2 = Hoho(hoho=hohoval, counter=5)
        session.add(h2)
        session.flush()
        eq_(h2.hoho, hohoval)
        eq_(h2.counter, 5)


    def test_insert_nopostfetch(self):
        default_t, Hoho = self.tables.default_t, self.classes.Hoho

        # populates from the FetchValues explicitly so there is no
        # "post-update"
        mapper(Hoho, default_t)

        h1 = Hoho(hoho="15", counter=15)
        session = create_session()
        session.add(h1)
        session.flush()

        def go():
            eq_(h1.hoho, "15")
            eq_(h1.counter, 15)
            eq_(h1.foober, "im foober")
        self.sql_count_(0, go)

    @testing.fails_on('firebird', 'Data type unknown on the parameter')
    def test_update(self):
        default_t, Hoho = self.tables.default_t, self.classes.Hoho

        mapper(Hoho, default_t)

        h1 = Hoho()
        session = create_session()
        session.add(h1)
        session.flush()

        eq_(h1.foober, 'im foober')
        h1.counter = 19
        session.flush()
        eq_(h1.foober, 'im the update')

    @testing.fails_on('firebird', 'Data type unknown on the parameter')
    def test_used_in_relationship(self):
        """A server-side default can be used as the target of a foreign key"""

        Hoho, hohoval, default_t, secondary_table, Secondary = (self.classes.Hoho,
                                self.other.hohoval,
                                self.tables.default_t,
                                self.tables.secondary_table,
                                self.classes.Secondary)


        mapper(Hoho, default_t, properties={
            'secondaries':relationship(Secondary, order_by=secondary_table.c.id)})
        mapper(Secondary, secondary_table)

        h1 = Hoho()
        s1 = Secondary(data='s1')
        h1.secondaries.append(s1)

        session = create_session()
        session.add(h1)
        session.flush()
        session.expunge_all()

        eq_(session.query(Hoho).get(h1.id),
            Hoho(hoho=hohoval,
                 secondaries=[
                   Secondary(data='s1')]))

        h1 = session.query(Hoho).get(h1.id)
        h1.secondaries.append(Secondary(data='s2'))
        session.flush()
        session.expunge_all()

        eq_(session.query(Hoho).get(h1.id),
            Hoho(hoho=hohoval,
                 secondaries=[
                    Secondary(data='s1'),
                    Secondary(data='s2')]))

class ColumnPropertyTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('data', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('a', String(50)),
            Column('b', String(50))
            )

        Table('subdata', metadata,
            Column('id', Integer, ForeignKey('data.id'), primary_key=True),
            Column('c', String(50)),
            )

    @classmethod
    def setup_mappers(cls):
        class Data(cls.Basic):
            pass

    def test_refreshes(self):
        Data, data = self.classes.Data, self.tables.data

        mapper(Data, data, properties={
            'aplusb':column_property(data.c.a + literal_column("' '") + data.c.b)
        })
        self._test(True)

    def test_no_refresh(self):
        Data, data = self.classes.Data, self.tables.data

        mapper(Data, data, properties={
            'aplusb':column_property(data.c.a + literal_column("' '") + data.c.b,
                        expire_on_flush=False)
        })
        self._test(False)

    def test_refreshes_post_init(self):
        Data, data = self.classes.Data, self.tables.data

        m = mapper(Data, data)
        m.add_property('aplusb', column_property(data.c.a + literal_column("' '") + data.c.b))
        self._test(True)

    def test_with_inheritance(self):
        subdata, data, Data = (self.tables.subdata,
                                self.tables.data,
                                self.classes.Data)

        class SubData(Data):
            pass
        mapper(Data, data, properties={
            'aplusb':column_property(data.c.a + literal_column("' '") + data.c.b)
        })
        mapper(SubData, subdata, inherits=Data)

        sess = create_session()
        sd1 = SubData(a="hello", b="there", c="hi")
        sess.add(sd1)
        sess.flush()
        eq_(sd1.aplusb, "hello there")

    def _test(self, expect_expiry):
        Data = self.classes.Data

        sess = create_session()

        d1 = Data(a="hello", b="there")
        sess.add(d1)
        sess.flush()

        eq_(d1.aplusb, "hello there")

        d1.b = "bye"
        sess.flush()
        if expect_expiry:
            eq_(d1.aplusb, "hello bye")
        else:
            eq_(d1.aplusb, "hello there")


        d1.b = 'foobar'
        d1.aplusb = 'im setting this explicitly'
        sess.flush()
        eq_(d1.aplusb, "im setting this explicitly")

class OneToManyTest(_fixtures.FixtureTest):
    run_inserts = None

    def test_one_to_many_1(self):
        """Basic save of one to many."""

        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)


        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select')
        ))
        u = User(name= 'one2manytester')
        a = Address(email_address='one2many@test.org')
        u.addresses.append(a)

        a2 = Address(email_address='lala@test.org')
        u.addresses.append(a2)

        session = create_session()
        session.add(u)
        session.flush()

        user_rows = users.select(users.c.id.in_([u.id])).execute().fetchall()
        eq_(list(user_rows[0].values()), [u.id, 'one2manytester'])

        address_rows = addresses.select(
            addresses.c.id.in_([a.id, a2.id]),
            order_by=[addresses.c.email_address]).execute().fetchall()
        eq_(list(address_rows[0].values()), [a2.id, u.id, 'lala@test.org'])
        eq_(list(address_rows[1].values()), [a.id, u.id, 'one2many@test.org'])

        userid = u.id
        addressid = a2.id

        a2.email_address = 'somethingnew@foo.com'

        session.flush()

        address_rows = addresses.select(
            addresses.c.id == addressid).execute().fetchall()
        eq_(list(address_rows[0].values()),
            [addressid, userid, 'somethingnew@foo.com'])
        self.assert_(u.id == userid and a2.id == addressid)

    def test_one_to_many_2(self):
        """Modifying the child items of an object."""

        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)


        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select')))

        u1 = User(name='user1')
        u1.addresses = []
        a1 = Address(email_address='emailaddress1')
        u1.addresses.append(a1)

        u2 = User(name='user2')
        u2.addresses = []
        a2 = Address(email_address='emailaddress2')
        u2.addresses.append(a2)

        a3 = Address(email_address='emailaddress3')

        session = create_session()
        session.add_all((u1, u2, a3))
        session.flush()

        # modify user2 directly, append an address to user1.
        # upon commit, user2 should be updated, user1 should not
        # both address1 and address3 should be updated
        u2.name = 'user2modified'
        u1.addresses.append(a3)
        del u1.addresses[0]

        self.assert_sql(testing.db, session.flush, [
            ("UPDATE users SET name=:name "
             "WHERE users.id = :users_id",
             {'users_id': u2.id, 'name': 'user2modified'}),

            ("UPDATE addresses SET user_id=:user_id "
             "WHERE addresses.id = :addresses_id",
             [
                {'user_id': None, 'addresses_id': a1.id},
                {'user_id': u1.id, 'addresses_id': a3.id}
            ]),

            ])

    def test_child_move(self):
        """Moving a child from one parent to another, with a delete.

        Tests that deleting the first parent properly updates the child with
        the new parent.  This tests the 'trackparent' option in the attributes
        module.

        """

        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select')))

        u1 = User(name='user1')
        u2 = User(name='user2')
        a = Address(email_address='address1')
        u1.addresses.append(a)

        session = create_session()
        session.add_all((u1, u2))
        session.flush()

        del u1.addresses[0]
        u2.addresses.append(a)
        session.delete(u1)

        session.flush()
        session.expunge_all()

        u2 = session.query(User).get(u2.id)
        eq_(len(u2.addresses), 1)

    def test_child_move_2(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        m = mapper(User, users, properties=dict(
            addresses = relationship(mapper(Address, addresses), lazy='select')))

        u1 = User(name='user1')
        u2 = User(name='user2')
        a = Address(email_address='address1')
        u1.addresses.append(a)

        session = create_session()
        session.add_all((u1, u2))
        session.flush()

        del u1.addresses[0]
        u2.addresses.append(a)

        session.flush()
        session.expunge_all()

        u2 = session.query(User).get(u2.id)
        eq_(len(u2.addresses), 1)

    def test_o2m_delete_parent(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        m = mapper(User, users, properties=dict(
            address = relationship(mapper(Address, addresses),
                               lazy='select',
                               uselist=False)))

        u = User(name='one2onetester')
        a = Address(email_address='myonlyaddress@foo.com')
        u.address = a

        session = create_session()
        session.add(u)
        session.flush()

        session.delete(u)
        session.flush()

        assert a.id is not None
        assert a.user_id is None
        assert sa.orm.attributes.instance_state(a).key in session.identity_map
        assert sa.orm.attributes.instance_state(u).key not in session.identity_map

    def test_one_to_one(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        m = mapper(User, users, properties=dict(
            address = relationship(mapper(Address, addresses),
                               lazy='select',
                               uselist=False)))

        u = User(name='one2onetester')
        u.address = Address(email_address='myonlyaddress@foo.com')

        session = create_session()
        session.add(u)
        session.flush()

        u.name = 'imnew'
        session.flush()

        u.address.email_address = 'imnew@foo.com'
        session.flush()

    def test_bidirectional(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        m1 = mapper(User, users)
        m2 = mapper(Address, addresses, properties=dict(
            user = relationship(m1, lazy='joined', backref='addresses')))


        u = User(name='test')
        a = Address(email_address='testaddress', user=u)

        session = create_session()
        session.add(u)
        session.flush()
        session.delete(u)
        session.flush()

    def test_double_relationship(self):
        Address, addresses, users, User = (self.classes.Address,
                                self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        m2 = mapper(Address, addresses)
        m = mapper(User, users, properties={
            'boston_addresses' : relationship(m2, primaryjoin=
                        sa.and_(users.c.id==addresses.c.user_id,
                                addresses.c.email_address.like('%boston%'))),
            'newyork_addresses' : relationship(m2, primaryjoin=
                        sa.and_(users.c.id==addresses.c.user_id,
                                addresses.c.email_address.like('%newyork%')))})

        u = User(name='u1')
        a = Address(email_address='foo@boston.com')
        b = Address(email_address='bar@newyork.com')
        u.boston_addresses.append(a)
        u.newyork_addresses.append(b)

        session = create_session()
        session.add(u)
        session.flush()

class SaveTest(_fixtures.FixtureTest):
    run_inserts = None

    def test_basic(self):
        User, users = self.classes.User, self.tables.users

        m = mapper(User, users)

        # save two users
        u = User(name='savetester')
        u2 = User(name='savetester2')

        session = create_session()
        session.add_all((u, u2))
        session.flush()

        # assert the first one retrieves the same from the identity map
        nu = session.query(m).get(u.id)
        assert u is nu

        # clear out the identity map, so next get forces a SELECT
        session.expunge_all()

        # check it again, identity should be different but ids the same
        nu = session.query(m).get(u.id)
        assert u is not nu and u.id == nu.id and nu.name == 'savetester'

        # change first users name and save
        session = create_session()
        session.add(u)
        u.name = 'modifiedname'
        assert u in session.dirty
        session.flush()

        # select both
        userlist = session.query(User).filter(
            users.c.id.in_([u.id, u2.id])).order_by(users.c.name).all()

        eq_(u.id, userlist[0].id)
        eq_(userlist[0].name, 'modifiedname')
        eq_(u2.id, userlist[1].id)
        eq_(userlist[1].name, 'savetester2')

    def test_synonym(self):
        users = self.tables.users

        class SUser(fixtures.BasicEntity):
            def _get_name(self):
                return "User:" + self.name
            def _set_name(self, name):
                self.name = name + ":User"
            syn_name = property(_get_name, _set_name)

        mapper(SUser, users, properties={
            'syn_name': sa.orm.synonym('name')
        })

        u = SUser(syn_name="some name")
        eq_(u.syn_name, 'User:some name:User')

        session = create_session()
        session.add(u)
        session.flush()
        session.expunge_all()

        u = session.query(SUser).first()
        eq_(u.syn_name, 'User:some name:User')

    def test_lazyattr_commit(self):
        """Lazily loaded relationships.

        When a lazy-loaded list is unloaded, and a commit occurs, that the
        'passive' call on that list does not blow away its value

        """

        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        mapper(User, users, properties = {
            'addresses': relationship(mapper(Address, addresses))})

        u = User(name='u1')
        u.addresses.append(Address(email_address='u1@e1'))
        u.addresses.append(Address(email_address='u1@e2'))
        u.addresses.append(Address(email_address='u1@e3'))
        u.addresses.append(Address(email_address='u1@e4'))

        session = create_session()
        session.add(u)
        session.flush()
        session.expunge_all()

        u = session.query(User).one()
        u.name = 'newname'
        session.flush()
        eq_(len(u.addresses), 4)

    def test_inherits(self):
        """a user object that also has the users mailing address."""

        users, addresses, User = (self.tables.users,
                                self.tables.addresses,
                                self.classes.User)

        m1 = mapper(User, users)

        class AddressUser(User):
            pass

        # define a mapper for AddressUser that inherits the User.mapper, and
        # joins on the id column
        mapper(AddressUser, addresses, inherits=m1, properties={
                'address_id': addresses.c.id
            })

        au = AddressUser(name='u', email_address='u@e')

        session = create_session()
        session.add(au)
        session.flush()
        session.expunge_all()

        rt = session.query(AddressUser).one()
        eq_(au.user_id, rt.user_id)
        eq_(rt.id, rt.id)

    def test_deferred(self):
        """Deferred column operations"""

        orders, Order = self.tables.orders, self.classes.Order


        mapper(Order, orders, properties={
            'description': sa.orm.deferred(orders.c.description)})

        # don't set deferred attribute, commit session
        o = Order(id=42)
        session = create_session(autocommit=False)
        session.add(o)
        session.commit()

        # assert that changes get picked up
        o.description = 'foo'
        session.commit()

        eq_(list(session.execute(orders.select(), mapper=Order)),
            [(42, None, None, 'foo', None)])
        session.expunge_all()

        # assert that a set operation doesn't trigger a load operation
        o = session.query(Order).filter(Order.description == 'foo').one()
        def go():
            o.description = 'hoho'
        self.sql_count_(0, go)
        session.flush()

        eq_(list(session.execute(orders.select(), mapper=Order)),
            [(42, None, None, 'hoho', None)])

        session.expunge_all()

        # test assigning None to an unloaded deferred also works
        o = session.query(Order).filter(Order.description == 'hoho').one()
        o.description = None
        session.flush()
        eq_(list(session.execute(orders.select(), mapper=Order)),
            [(42, None, None, None, None)])
        session.close()

    # why no support on oracle ?  because oracle doesn't save
    # "blank" strings; it saves a single space character.
    @testing.fails_on('oracle', 'FIXME: unknown')
    def test_dont_update_blanks(self):
        User, users = self.classes.User, self.tables.users

        mapper(User, users)

        u = User(name='')
        session = create_session()
        session.add(u)
        session.flush()
        session.expunge_all()

        u = session.query(User).get(u.id)
        u.name = ''
        self.sql_count_(0, session.flush)

    def test_multi_table_selectable(self):
        """Mapped selectables that span tables.

        Also tests redefinition of the keynames for the column properties.

        """

        addresses, users, User = (self.tables.addresses,
                                self.tables.users,
                                self.classes.User)

        usersaddresses = sa.join(users, addresses,
                                 users.c.id == addresses.c.user_id)

        m = mapper(User, usersaddresses,
            properties=dict(
                email = addresses.c.email_address,
                foo_id = [users.c.id, addresses.c.user_id]))

        u = User(name='multitester', email='multi@test.org')
        session = create_session()
        session.add(u)
        session.flush()
        session.expunge_all()

        id = m.primary_key_from_instance(u)

        u = session.query(User).get(id)
        assert u.name == 'multitester'

        user_rows = users.select(users.c.id.in_([u.foo_id])).execute().fetchall()
        eq_(list(user_rows[0].values()), [u.foo_id, 'multitester'])
        address_rows = addresses.select(addresses.c.id.in_([u.id])).execute().fetchall()
        eq_(list(address_rows[0].values()), [u.id, u.foo_id, 'multi@test.org'])

        u.email = 'lala@hey.com'
        u.name = 'imnew'
        session.flush()

        user_rows = users.select(users.c.id.in_([u.foo_id])).execute().fetchall()
        eq_(list(user_rows[0].values()), [u.foo_id, 'imnew'])
        address_rows = addresses.select(addresses.c.id.in_([u.id])).execute().fetchall()
        eq_(list(address_rows[0].values()), [u.id, u.foo_id, 'lala@hey.com'])

        session.expunge_all()
        u = session.query(User).get(id)
        assert u.name == 'imnew'

    def test_history_get(self):
        """The history lazy-fetches data when it wasn't otherwise loaded."""

        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        mapper(User, users, properties={
            'addresses':relationship(Address, cascade="all, delete-orphan")})
        mapper(Address, addresses)

        u = User(name='u1')
        u.addresses.append(Address(email_address='u1@e1'))
        u.addresses.append(Address(email_address='u1@e2'))
        session = create_session()
        session.add(u)
        session.flush()
        session.expunge_all()

        u = session.query(User).get(u.id)
        session.delete(u)
        session.flush()
        assert users.count().scalar() == 0
        assert addresses.count().scalar() == 0

    def test_batch_mode(self):
        """The 'batch=False' flag on mapper()"""

        users, User = self.tables.users, self.classes.User


        names = []
        class Events(object):
            def before_insert(self, mapper, connection, instance):
                self.current_instance = instance
                names.append(instance.name)
            def after_insert(self, mapper, connection, instance):
                assert instance is self.current_instance

        mapper(User, users, batch=False)

        evt = Events()
        event.listen(User, "before_insert", evt.before_insert)
        event.listen(User, "after_insert", evt.after_insert)

        u1 = User(name='user1')
        u2 = User(name='user2')

        session = create_session()
        session.add_all((u1, u2))
        session.flush()

        u3 = User(name='user3')
        u4 = User(name='user4')
        u5 = User(name='user5')

        session.add_all([u4, u5, u3])
        session.flush()

        # test insert ordering is maintained
        assert names == ['user1', 'user2', 'user4', 'user5', 'user3']
        session.expunge_all()

        sa.orm.clear_mappers()

        m = mapper(User, users)
        evt = Events()
        event.listen(User, "before_insert", evt.before_insert)
        event.listen(User, "after_insert", evt.after_insert)

        u1 = User(name='user1')
        u2 = User(name='user2')
        session.add_all((u1, u2))
        assert_raises(AssertionError, session.flush)


class ManyToOneTest(_fixtures.FixtureTest):
    run_inserts = None

    def test_m2o_one_to_one(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        # TODO: put assertion in here !!!
        m = mapper(Address, addresses, properties=dict(
            user = relationship(mapper(User, users), lazy='select', uselist=False)))

        session = create_session()

        data = [
            {'name': 'thesub' ,  'email_address': 'bar@foo.com'},
            {'name': 'assdkfj' , 'email_address': 'thesdf@asdf.com'},
            {'name': 'n4knd' ,   'email_address': 'asf3@bar.org'},
            {'name': 'v88f4' ,   'email_address': 'adsd5@llala.net'},
            {'name': 'asdf8d' ,  'email_address': 'theater@foo.com'}
        ]
        objects = []
        for elem in data:
            a = Address()
            a.email_address = elem['email_address']
            a.user = User()
            a.user.name = elem['name']
            objects.append(a)
            session.add(a)

        session.flush()
        objects[2].email_address = 'imnew@foo.bar'
        objects[3].user = User()
        objects[3].user.name = 'imnewlyadded'
        self.assert_sql_execution(testing.db,
                        session.flush,
                        CompiledSQL("INSERT INTO users (name) VALUES (:name)",
                         {'name': 'imnewlyadded'} ),

                         AllOf(
                            CompiledSQL("UPDATE addresses SET email_address=:email_address "
                                        "WHERE addresses.id = :addresses_id",
                                        lambda ctx: {'email_address': 'imnew@foo.bar',
                                          'addresses_id': objects[2].id}),
                            CompiledSQL("UPDATE addresses SET user_id=:user_id "
                                          "WHERE addresses.id = :addresses_id",
                                          lambda ctx: {'user_id': objects[3].user.id,
                                                       'addresses_id': objects[3].id})
                        )
                    )

        l = sa.select([users, addresses],
                      sa.and_(users.c.id==addresses.c.user_id,
                              addresses.c.id==a.id)).execute()
        eq_(list(l.first().values()),
            [a.user.id, 'asdf8d', a.id, a.user_id, 'theater@foo.com'])

    def test_many_to_one_1(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        m = mapper(Address, addresses, properties=dict(
            user = relationship(mapper(User, users), lazy='select')))

        a1 = Address(email_address='emailaddress1')
        u1 = User(name='user1')
        a1.user = u1

        session = create_session()
        session.add(a1)
        session.flush()
        session.expunge_all()

        a1 = session.query(Address).get(a1.id)
        u1 = session.query(User).get(u1.id)
        assert a1.user is u1

        a1.user = None
        session.flush()
        session.expunge_all()
        a1 = session.query(Address).get(a1.id)
        u1 = session.query(User).get(u1.id)
        assert a1.user is None

    def test_many_to_one_2(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        m = mapper(Address, addresses, properties=dict(
            user = relationship(mapper(User, users), lazy='select')))

        a1 = Address(email_address='emailaddress1')
        a2 = Address(email_address='emailaddress2')
        u1 = User(name='user1')
        a1.user = u1

        session = create_session()
        session.add_all((a1, a2))
        session.flush()
        session.expunge_all()

        a1 = session.query(Address).get(a1.id)
        a2 = session.query(Address).get(a2.id)
        u1 = session.query(User).get(u1.id)
        assert a1.user is u1

        a1.user = None
        a2.user = u1
        session.flush()
        session.expunge_all()

        a1 = session.query(Address).get(a1.id)
        a2 = session.query(Address).get(a2.id)
        u1 = session.query(User).get(u1.id)
        assert a1.user is None
        assert a2.user is u1

    def test_many_to_one_3(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        m = mapper(Address, addresses, properties=dict(
            user = relationship(mapper(User, users), lazy='select')))

        a1 = Address(email_address='emailaddress1')
        u1 = User(name='user1')
        u2 = User(name='user2')
        a1.user = u1

        session = create_session()
        session.add_all((a1, u1, u2))
        session.flush()
        session.expunge_all()

        a1 = session.query(Address).get(a1.id)
        u1 = session.query(User).get(u1.id)
        u2 = session.query(User).get(u2.id)
        assert a1.user is u1

        a1.user = u2
        session.flush()
        session.expunge_all()
        a1 = session.query(Address).get(a1.id)
        u1 = session.query(User).get(u1.id)
        u2 = session.query(User).get(u2.id)
        assert a1.user is u2

    def test_bidirectional_no_load(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        mapper(User, users, properties={
            'addresses':relationship(Address, backref='user', lazy='noload')})
        mapper(Address, addresses)

        # try it on unsaved objects
        u1 = User(name='u1')
        a1 = Address(email_address='e1')
        a1.user = u1

        session = create_session()
        session.add(u1)
        session.flush()
        session.expunge_all()

        a1 = session.query(Address).get(a1.id)

        a1.user = None
        session.flush()
        session.expunge_all()
        assert session.query(Address).get(a1.id).user is None
        assert session.query(User).get(u1.id).addresses == []


class ManyToManyTest(_fixtures.FixtureTest):
    run_inserts = None

    def test_many_to_many(self):
        keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.item_keywords,
                                self.classes.Keyword,
                                self.classes.Item)

        mapper(Keyword, keywords)

        m = mapper(Item, items, properties=dict(
                keywords=relationship(Keyword,
                                  item_keywords,
                                  lazy='joined',
                                  order_by=keywords.c.name)))

        data = [Item,
            {'description': 'mm_item1',
             'keywords' : (Keyword, [{'name': 'big'},
                                     {'name': 'green'},
                                     {'name': 'purple'},
                                     {'name': 'round'}])},
            {'description': 'mm_item2',
             'keywords' : (Keyword, [{'name':'blue'},
                                     {'name':'imnew'},
                                     {'name':'round'},
                                     {'name':'small'}])},
            {'description': 'mm_item3',
             'keywords' : (Keyword, [])},
            {'description': 'mm_item4',
             'keywords' : (Keyword, [{'name':'big'},
                                    {'name':'blue'},])},
            {'description': 'mm_item5',
             'keywords' : (Keyword, [{'name':'big'},
                                     {'name':'exacting'},
                                     {'name':'green'}])},
            {'description': 'mm_item6',
             'keywords' : (Keyword, [{'name':'red'},
                                     {'name':'round'},
                                     {'name':'small'}])}]

        session = create_session()

        objects = []
        _keywords = dict([(k.name, k) for k in session.query(Keyword)])

        for elem in data[1:]:
            item = Item(description=elem['description'])
            objects.append(item)

            for spec in elem['keywords'][1]:
                keyword_name = spec['name']
                try:
                    kw = _keywords[keyword_name]
                except KeyError:
                    _keywords[keyword_name] = kw = Keyword(name=keyword_name)
                item.keywords.append(kw)

        session.add_all(objects)
        session.flush()

        l = (session.query(Item).
             filter(Item.description.in_([e['description']
                                          for e in data[1:]])).
             order_by(Item.description).all())
        self.assert_result(l, *data)

        objects[4].description = 'item4updated'
        k = Keyword()
        k.name = 'yellow'
        objects[5].keywords.append(k)
        self.assert_sql_execution(
            testing.db,
            session.flush,
            AllOf(
                CompiledSQL("UPDATE items SET description=:description "
                 "WHERE items.id = :items_id",
                     {'description': 'item4updated',
                      'items_id': objects[4].id},
                ),
                CompiledSQL("INSERT INTO keywords (name) "
                    "VALUES (:name)",
                    {'name': 'yellow'},
                )
            ),
            CompiledSQL("INSERT INTO item_keywords (item_id, keyword_id) "
                    "VALUES (:item_id, :keyword_id)",
                     lambda ctx: [{'item_id': objects[5].id,
                                   'keyword_id': k.id}])
            )

        objects[2].keywords.append(k)
        dkid = objects[5].keywords[1].id
        del objects[5].keywords[1]
        self.assert_sql_execution(
            testing.db,
            session.flush,
            CompiledSQL("DELETE FROM item_keywords "
                     "WHERE item_keywords.item_id = :item_id AND "
                     "item_keywords.keyword_id = :keyword_id",
                     [{'item_id': objects[5].id, 'keyword_id': dkid}]),
            CompiledSQL("INSERT INTO item_keywords (item_id, keyword_id) "
                    "VALUES (:item_id, :keyword_id)",
                    lambda ctx: [{'item_id': objects[2].id, 'keyword_id': k.id}]
             ))

        session.delete(objects[3])
        session.flush()

    def test_many_to_many_remove(self):
        """Setting a collection to empty deletes many-to-many rows.

        Tests that setting a list-based attribute to '[]' properly affects the
        history and allows the many-to-many rows to be deleted

        """

        keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.item_keywords,
                                self.classes.Keyword,
                                self.classes.Item)

        mapper(Keyword, keywords)
        mapper(Item, items, properties=dict(
            keywords = relationship(Keyword, item_keywords, lazy='joined'),
            ))

        i = Item(description='i1')
        k1 = Keyword(name='k1')
        k2 = Keyword(name='k2')
        i.keywords.append(k1)
        i.keywords.append(k2)

        session = create_session()
        session.add(i)
        session.flush()

        assert item_keywords.count().scalar() == 2
        i.keywords = []
        session.flush()
        assert item_keywords.count().scalar() == 0

    def test_scalar(self):
        """sa.dependency won't delete an m2m relationship referencing None."""

        keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.item_keywords,
                                self.classes.Keyword,
                                self.classes.Item)


        mapper(Keyword, keywords)

        mapper(Item, items, properties=dict(
            keyword=relationship(Keyword, secondary=item_keywords, uselist=False)))

        i = Item(description='x')
        session = create_session()
        session.add(i)
        session.flush()
        session.delete(i)
        session.flush()

    def test_many_to_many_update(self):
        """Assorted history operations on a many to many"""

        keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.item_keywords,
                                self.classes.Keyword,
                                self.classes.Item)

        mapper(Keyword, keywords)
        mapper(Item, items, properties=dict(
            keywords=relationship(Keyword,
                              secondary=item_keywords,
                              lazy='joined',
                              order_by=keywords.c.name)))

        k1 = Keyword(name='keyword 1')
        k2 = Keyword(name='keyword 2')
        k3 = Keyword(name='keyword 3')

        item = Item(description='item 1')
        item.keywords.extend([k1, k2, k3])

        session = create_session()
        session.add(item)
        session.flush()

        item.keywords = []
        item.keywords.append(k1)
        item.keywords.append(k2)
        session.flush()

        session.expunge_all()
        item = session.query(Item).get(item.id)
        assert item.keywords == [k1, k2]

    def test_association(self):
        """Basic test of an association object"""

        keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.item_keywords,
                                self.classes.Keyword,
                                self.classes.Item)


        class IKAssociation(fixtures.ComparableEntity):
            pass

        mapper(Keyword, keywords)

        # note that we are breaking a rule here, making a second
        # mapper(Keyword, keywords) the reorganization of mapper construction
        # affected this, but was fixed again

        mapper(IKAssociation, item_keywords,
               primary_key=[item_keywords.c.item_id, item_keywords.c.keyword_id],
               properties=dict(
                 keyword=relationship(mapper(Keyword, keywords, non_primary=True),
                                  lazy='joined',
                                  uselist=False,
                                  order_by=keywords.c.name      # note here is a valid place where order_by can be used
                                  )))                           # on a scalar relationship(); to determine eager ordering of
                                                                # the parent object within its collection.

        mapper(Item, items, properties=dict(
            keywords=relationship(IKAssociation, lazy='joined')))

        session = create_session()

        def fixture():
            _kw = dict([(k.name, k) for k in session.query(Keyword)])
            for n in ('big', 'green', 'purple', 'round', 'huge',
                      'violet', 'yellow', 'blue'):
                if n not in _kw:
                    _kw[n] = Keyword(name=n)

            def assocs(*names):
                return [IKAssociation(keyword=kw)
                        for kw in [_kw[n] for n in names]]

            return [
                Item(description='a_item1',
                     keywords=assocs('big', 'green', 'purple', 'round')),
                Item(description='a_item2',
                     keywords=assocs('huge', 'violet', 'yellow')),
                Item(description='a_item3',
                     keywords=assocs('big', 'blue'))]

        session.add_all(fixture())
        session.flush()
        eq_(fixture(), session.query(Item).order_by(Item.description).all())


class SaveTest2(_fixtures.FixtureTest):
    run_inserts = None

    def test_m2o_nonmatch(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        mapper(User, users)
        mapper(Address, addresses, properties=dict(
            user = relationship(User, lazy='select', uselist=False)))

        session = create_session()

        def fixture():
            return [
                Address(email_address='a1', user=User(name='u1')),
                Address(email_address='a2', user=User(name='u2'))]

        session.add_all(fixture())

        self.assert_sql_execution(
            testing.db,
            session.flush,
            CompiledSQL("INSERT INTO users (name) VALUES (:name)",
             {'name': 'u1'}),
            CompiledSQL("INSERT INTO users (name) VALUES (:name)",
             {'name': 'u2'}),
            CompiledSQL("INSERT INTO addresses (user_id, email_address) "
             "VALUES (:user_id, :email_address)",
             {'user_id': 1, 'email_address': 'a1'}),
            CompiledSQL("INSERT INTO addresses (user_id, email_address) "
             "VALUES (:user_id, :email_address)",
             {'user_id': 2, 'email_address': 'a2'}),
        )

class SaveTest3(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('items', metadata,
              Column('item_id', Integer, primary_key=True,
                     test_needs_autoincrement=True),
              Column('item_name', String(50)))

        Table('keywords', metadata,
              Column('keyword_id', Integer, primary_key=True,
                     test_needs_autoincrement=True),
              Column('name', String(50)))

        Table('assoc', metadata,
              Column('item_id', Integer, ForeignKey("items")),
              Column('keyword_id', Integer, ForeignKey("keywords")),
              Column('foo', sa.Boolean, default=True))

    @classmethod
    def setup_classes(cls):
        class Keyword(cls.Basic):
            pass
        class Item(cls.Basic):
            pass

    def test_manytomany_xtracol_delete(self):
        """A many-to-many on a table that has an extra column can properly delete rows from the table without referencing the extra column"""

        keywords, items, assoc, Keyword, Item = (self.tables.keywords,
                                self.tables.items,
                                self.tables.assoc,
                                self.classes.Keyword,
                                self.classes.Item)


        mapper(Keyword, keywords)
        mapper(Item, items, properties=dict(
                keywords = relationship(Keyword, secondary=assoc, lazy='joined'),))

        i = Item()
        k1 = Keyword()
        k2 = Keyword()
        i.keywords.append(k1)
        i.keywords.append(k2)

        session = create_session()
        session.add(i)
        session.flush()

        assert assoc.count().scalar() == 2
        i.keywords = []
        session.flush()
        assert assoc.count().scalar() == 0

class BooleanColTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('t1_t', metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('name', String(30)),
            Column('value', sa.Boolean))

    def test_boolean(self):
        t1_t = self.tables.t1_t

        # use the regular mapper
        class T(fixtures.ComparableEntity):
            pass
        orm_mapper(T, t1_t, order_by=t1_t.c.id)

        sess = create_session()
        t1 = T(value=True, name="t1")
        t2 = T(value=False, name="t2")
        t3 = T(value=True, name="t3")
        sess.add_all((t1, t2, t3))

        sess.flush()

        for clear in (False, True):
            if clear:
                sess.expunge_all()
            eq_(sess.query(T).all(), [T(value=True, name="t1"), T(value=False, name="t2"), T(value=True, name="t3")])
            if clear:
                sess.expunge_all()
            eq_(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")])
            if clear:
                sess.expunge_all()
            eq_(sess.query(T).filter(T.value==False).all(), [T(value=False, name="t2")])

        t2 = sess.query(T).get(t2.id)
        t2.value = True
        sess.flush()
        eq_(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"), T(value=True, name="t2"), T(value=True, name="t3")])
        t2.value = False
        sess.flush()
        eq_(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")])


class RowSwitchTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        # parent
        Table('t5', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30), nullable=False))

        # onetomany
        Table('t6', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30), nullable=False),
            Column('t5id', Integer, ForeignKey('t5.id'),nullable=False))

        # associated
        Table('t7', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30), nullable=False))

        #manytomany
        Table('t5t7', metadata,
            Column('t5id', Integer, ForeignKey('t5.id'),nullable=False),
            Column('t7id', Integer, ForeignKey('t7.id'),nullable=False))

    @classmethod
    def setup_classes(cls):
        class T5(cls.Comparable):
            pass

        class T6(cls.Comparable):
            pass

        class T7(cls.Comparable):
            pass

    def test_onetomany(self):
        t6, T6, t5, T5 = (self.tables.t6,
                                self.classes.T6,
                                self.tables.t5,
                                self.classes.T5)

        mapper(T5, t5, properties={
            't6s':relationship(T6, cascade="all, delete-orphan")
        })
        mapper(T6, t6)

        sess = create_session()

        o5 = T5(data='some t5', id=1)
        o5.t6s.append(T6(data='some t6', id=1))
        o5.t6s.append(T6(data='some other t6', id=2))

        sess.add(o5)
        sess.flush()

        eq_(
            list(sess.execute(t5.select(), mapper=T5)),
            [(1, 'some t5')]
        )
        eq_(
            list(sess.execute(t6.select().order_by(t6.c.id), mapper=T5)),
            [(1, 'some t6', 1), (2, 'some other t6', 1)]
        )

        o6 = T5(data='some other t5', id=o5.id, t6s=[
            T6(data='third t6', id=3),
            T6(data='fourth t6', id=4),
            ])
        sess.delete(o5)
        sess.add(o6)
        sess.flush()

        eq_(
            list(sess.execute(t5.select(), mapper=T5)),
            [(1, 'some other t5')]
        )
        eq_(
            list(sess.execute(t6.select().order_by(t6.c.id), mapper=T5)),
            [(3, 'third t6', 1), (4, 'fourth t6', 1)]
        )

    def test_manytomany(self):
        t7, t5, t5t7, T5, T7 = (self.tables.t7,
                                self.tables.t5,
                                self.tables.t5t7,
                                self.classes.T5,
                                self.classes.T7)

        mapper(T5, t5, properties={
            't7s':relationship(T7, secondary=t5t7, cascade="all")
        })
        mapper(T7, t7)

        sess = create_session()

        o5 = T5(data='some t5', id=1)
        o5.t7s.append(T7(data='some t7', id=1))
        o5.t7s.append(T7(data='some other t7', id=2))

        sess.add(o5)
        sess.flush()

        assert list(sess.execute(t5.select(), mapper=T5)) == [(1, 'some t5')]
        assert testing.rowset(sess.execute(t5t7.select(), mapper=T5)) == set([(1,1), (1, 2)])
        assert list(sess.execute(t7.select(), mapper=T5)) == [(1, 'some t7'), (2, 'some other t7')]

        o6 = T5(data='some other t5', id=1, t7s=[
            T7(data='third t7', id=3),
            T7(data='fourth t7', id=4),
            ])

        sess.delete(o5)
        assert o5 in sess.deleted
        assert o5.t7s[0] in sess.deleted
        assert o5.t7s[1] in sess.deleted

        sess.add(o6)
        sess.flush()

        assert list(sess.execute(t5.select(), mapper=T5)) == [(1, 'some other t5')]
        assert list(sess.execute(t7.select(), mapper=T5)) == [(3, 'third t7'), (4, 'fourth t7')]

    def test_manytoone(self):
        t6, T6, t5, T5 = (self.tables.t6,
                                self.classes.T6,
                                self.tables.t5,
                                self.classes.T5)


        mapper(T6, t6, properties={
            't5':relationship(T5)
        })
        mapper(T5, t5)

        sess = create_session()

        o5 = T6(data='some t6', id=1)
        o5.t5 = T5(data='some t5', id=1)

        sess.add(o5)
        sess.flush()

        assert list(sess.execute(t5.select(), mapper=T5)) == [(1, 'some t5')]
        assert list(sess.execute(t6.select(), mapper=T5)) == [(1, 'some t6', 1)]

        o6 = T6(data='some other t6', id=1, t5=T5(data='some other t5', id=2))
        sess.delete(o5)
        sess.delete(o5.t5)
        sess.add(o6)
        sess.flush()

        assert list(sess.execute(t5.select(), mapper=T5)) == [(2, 'some other t5')]
        assert list(sess.execute(t6.select(), mapper=T5)) == [(1, 'some other t6', 2)]

class InheritingRowSwitchTest(fixtures.MappedTest):
    @classmethod
    def define_tables(cls, metadata):
        Table('parent', metadata,
            Column('pid', Integer, primary_key=True),
            Column('pdata', String(30))
        )
        Table('child', metadata,
            Column('cid', Integer, primary_key=True),
            Column('pid', Integer, ForeignKey('parent.pid')),
            Column('cdata', String(30))
        )

    @classmethod
    def setup_classes(cls):
        class P(cls.Comparable):
            pass

        class C(P):
            pass

    def test_row_switch_no_child_table(self):
        P, C, parent, child = (self.classes.P,
                                self.classes.C,
                                self.tables.parent,
                                self.tables.child)

        mapper(P, parent)
        mapper(C, child, inherits=P)

        sess = create_session()
        c1 = C(pid=1, cid=1, pdata='c1', cdata='c1')
        sess.add(c1)
        sess.flush()

        # establish a row switch between c1 and c2.
        # c2 has no value for the "child" table
        c2 = C(pid=1, cid=1, pdata='c2')
        sess.add(c2)
        sess.delete(c1)

        self.assert_sql_execution(testing.db, sess.flush,
            CompiledSQL("UPDATE parent SET pdata=:pdata WHERE parent.pid = :parent_pid",
                {'pdata':'c2', 'parent_pid':1}
            ),

            # this fires as of [ticket:1362], since we synchronzize
            # PK/FKs on UPDATES.  c2 is new so the history shows up as
            # pure added, update occurs.  If a future change limits the
            # sync operation during _save_obj().update, this is safe to remove again.
            CompiledSQL("UPDATE child SET pid=:pid WHERE child.cid = :child_cid",
                {'pid':1, 'child_cid':1}
            )
        )

class TransactionTest(fixtures.MappedTest):
    __requires__ = ('deferrable_or_no_constraints',)

    @classmethod
    def define_tables(cls, metadata):
        t1 = Table('t1', metadata,
            Column('id', Integer, primary_key=True))

        t2 = Table('t2', metadata,
            Column('id', Integer, primary_key=True),
            Column('t1_id', Integer,
                   ForeignKey('t1.id', deferrable=True, initially='deferred')
                   ))
    @classmethod
    def setup_classes(cls):
        class T1(cls.Comparable):
            pass

        class T2(cls.Comparable):
            pass

    @classmethod
    def setup_mappers(cls):
        T2, T1, t2, t1 = (cls.classes.T2,
                                cls.classes.T1,
                                cls.tables.t2,
                                cls.tables.t1)

        orm_mapper(T1, t1)
        orm_mapper(T2, t2)

    def test_close_transaction_on_commit_fail(self):
        T2, t1 = self.classes.T2, self.tables.t1

        session = create_session(autocommit=True)

        # with a deferred constraint, this fails at COMMIT time instead
        # of at INSERT time.
        session.add(T2(t1_id=123))

        try:
            session.flush()
            assert False
        except:
            # Flush needs to rollback also when commit fails
            assert session.transaction is None

        # todo: on 8.3 at least, the failed commit seems to close the cursor?
        # needs investigation.  leaving in the DDL above now to help verify
        # that the new deferrable support on FK isn't involved in this issue.
        if testing.against('postgresql'):
            t1.bind.engine.dispose()

class PartialNullPKTest(fixtures.MappedTest):
    # sqlite totally fine with NULLs in pk columns.
    # no other DB is like this.
    __only_on__ = ('sqlite',)

    @classmethod
    def define_tables(cls, metadata):
        Table('t1', metadata,
            Column('col1', String(10), primary_key=True, nullable=True),
            Column('col2', String(10), primary_key=True, nullable=True),
            Column('col3', String(50))
            )

    @classmethod
    def setup_classes(cls):
        class T1(cls.Basic):
            pass

    @classmethod
    def setup_mappers(cls):
        orm_mapper(cls.classes.T1, cls.tables.t1)

    def test_key_switch(self):
        T1 = self.classes.T1
        s = Session()
        s.add(T1(col1="1", col2=None))

        t1 = s.query(T1).first()
        t1.col2 = 5
        assert_raises_message(
            orm_exc.FlushError,
            "Can't update table t1 using NULL for primary "
            "key value on column t1.col2",
            s.commit
        )

    def test_plain_update(self):
        T1 = self.classes.T1
        s = Session()
        s.add(T1(col1="1", col2=None))

        t1 = s.query(T1).first()
        t1.col3 = 'hi'
        assert_raises_message(
            orm_exc.FlushError,
            "Can't update table t1 using NULL for primary "
            "key value on column t1.col2",
            s.commit
        )

    def test_delete(self):
        T1 = self.classes.T1
        s = Session()
        s.add(T1(col1="1", col2=None))

        t1 = s.query(T1).first()
        s.delete(t1)
        assert_raises_message(
            orm_exc.FlushError,
            "Can't delete from table t1 using NULL "
            "for primary key value on column t1.col2",
            s.commit
        )

    def test_total_null(self):
        T1 = self.classes.T1
        s = Session()
        s.add(T1(col1=None, col2=None))
        assert_raises_message(
            orm_exc.FlushError,
            r"Instance \<T1 at .+?\> has a NULL "
            "identity key.  If this is an auto-generated value, "
            "check that the database table allows generation ",
            s.commit
        )

    def test_dont_complain_if_no_update(self):
        T1 = self.classes.T1
        s = Session()
        t = T1(col1="1", col2=None)
        s.add(t)
        s.commit()

        t.col1 = "1"
        s.commit()