# coding: utf-8

from sqlalchemy.engine import reflection
from sqlalchemy.testing.assertions import eq_, assert_raises, \
    AssertsExecutionResults
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
from sqlalchemy import inspect
from sqlalchemy import Table, Column, MetaData, Integer, String, \
    PrimaryKeyConstraint, ForeignKey, join, Sequence, UniqueConstraint, \
    Index
from sqlalchemy import exc
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import ARRAY


class ForeignTableReflectionTest(fixtures.TablesTest, AssertsExecutionResults):
    """Test reflection on foreign tables"""

    __requires__ = 'postgresql_test_dblink',
    __only_on__ = 'postgresql >= 9.3'
    __backend__ = True

    @classmethod
    def define_tables(cls, metadata):
        from sqlalchemy.testing import config
        dblink = config.file_config.get(
            'sqla_testing', 'postgres_test_db_link')

        testtable = Table(
            'testtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30)))

        for ddl in [
            "CREATE SERVER test_server FOREIGN DATA WRAPPER postgres_fdw "
            "OPTIONS (dbname 'test', host '%s')" % dblink,
            "CREATE USER MAPPING FOR public \
            SERVER test_server options (user 'scott', password 'tiger')",
            "CREATE FOREIGN TABLE test_foreigntable ( "
            "   id          INT, "
            "   data        VARCHAR(30) "
            ") SERVER test_server OPTIONS (table_name 'testtable')",
        ]:
            sa.event.listen(metadata, "after_create", sa.DDL(ddl))

        for ddl in [
            'DROP FOREIGN TABLE test_foreigntable',
            'DROP USER MAPPING FOR public SERVER test_server',
            "DROP SERVER test_server"
        ]:
            sa.event.listen(metadata, "before_drop", sa.DDL(ddl))

    def test_foreign_table_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('test_foreigntable', metadata, autoload=True)
        eq_(set(table.columns.keys()), set(['id', 'data']),
            "Columns of reflected foreign table didn't equal expected columns")

    def test_get_foreign_table_names(self):
        inspector = inspect(testing.db)
        with testing.db.connect() as conn:
            ft_names = inspector.get_foreign_table_names()
            eq_(ft_names, ['test_foreigntable'])

    def test_get_table_names_no_foreign(self):
        inspector = inspect(testing.db)
        with testing.db.connect() as conn:
            names = inspector.get_table_names()
            eq_(names, ['testtable'])


class MaterializedViewReflectionTest(
        fixtures.TablesTest, AssertsExecutionResults):
    """Test reflection on materialized views"""

    __only_on__ = 'postgresql >= 9.3'
    __backend__ = True

    @classmethod
    def define_tables(cls, metadata):
        testtable = Table(
            'testtable', metadata,
            Column('id', Integer, primary_key=True),
            Column('data', String(30)))

        # insert data before we create the view
        @sa.event.listens_for(testtable, "after_create")
        def insert_data(target, connection, **kw):
            connection.execute(
                target.insert(),
                {"id": 89, "data": 'd1'}
            )

        materialized_view = sa.DDL(
            "CREATE MATERIALIZED VIEW test_mview AS "
            "SELECT * FROM testtable")

        plain_view = sa.DDL(
            "CREATE VIEW test_regview AS "
            "SELECT * FROM testtable")

        sa.event.listen(testtable, 'after_create', plain_view)
        sa.event.listen(testtable, 'after_create', materialized_view)
        sa.event.listen(
            testtable, 'before_drop',
            sa.DDL("DROP MATERIALIZED VIEW test_mview")
        )
        sa.event.listen(
            testtable, 'before_drop',
            sa.DDL("DROP VIEW test_regview")
        )

    def test_mview_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('test_mview', metadata, autoload=True)
        eq_(set(table.columns.keys()), set(['id', 'data']),
            "Columns of reflected mview didn't equal expected columns")

    def test_mview_select(self):
        metadata = MetaData(testing.db)
        table = Table('test_mview', metadata, autoload=True)
        eq_(
            table.select().execute().fetchall(),
            [(89, 'd1',)]
        )

    def test_get_view_names(self):
        insp = inspect(testing.db)
        eq_(set(insp.get_view_names()), set(['test_mview', 'test_regview']))


class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
    """Test PostgreSQL domains"""

    __only_on__ = 'postgresql > 8.3'
    __backend__ = True

    @classmethod
    def setup_class(cls):
        con = testing.db.connect()
        for ddl in \
                'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \
                'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \
                "CREATE TYPE testtype AS ENUM ('test')", \
                'CREATE DOMAIN enumdomain AS testtype':
            try:
                con.execute(ddl)
            except exc.DBAPIError as e:
                if 'already exists' not in str(e):
                    raise e
        con.execute('CREATE TABLE testtable (question integer, answer '
                    'testdomain)')
        con.execute('CREATE TABLE test_schema.testtable(question '
                    'integer, answer test_schema.testdomain, anything '
                    'integer)')
        con.execute('CREATE TABLE crosschema (question integer, answer '
                    'test_schema.testdomain)')

        con.execute('CREATE TABLE enum_test (id integer, data enumdomain)')

    @classmethod
    def teardown_class(cls):
        con = testing.db.connect()
        con.execute('DROP TABLE testtable')
        con.execute('DROP TABLE test_schema.testtable')
        con.execute('DROP TABLE crosschema')
        con.execute('DROP DOMAIN testdomain')
        con.execute('DROP DOMAIN test_schema.testdomain')
        con.execute("DROP TABLE enum_test")
        con.execute("DROP DOMAIN enumdomain")
        con.execute("DROP TYPE testtype")

    def test_table_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True)
        eq_(set(table.columns.keys()), set(['question', 'answer']),
            "Columns of reflected table didn't equal expected columns")
        assert isinstance(table.c.answer.type, Integer)

    def test_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True)
        eq_(str(table.columns.answer.server_default.arg), '42',
            "Reflected default value didn't equal expected value")
        assert not table.columns.answer.nullable, \
            'Expected reflected column to not be nullable.'

    def test_enum_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('enum_test', metadata, autoload=True)
        eq_(
            table.c.data.type.enums,
            ('test', )
        )

    def test_table_is_reflected_test_schema(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True,
                      schema='test_schema')
        eq_(set(table.columns.keys()), set(['question', 'answer',
                                            'anything']),
            "Columns of reflected table didn't equal expected columns")
        assert isinstance(table.c.anything.type, Integer)

    def test_schema_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('testtable', metadata, autoload=True,
                      schema='test_schema')
        eq_(str(table.columns.answer.server_default.arg), '0',
            "Reflected default value didn't equal expected value")
        assert table.columns.answer.nullable, \
            'Expected reflected column to be nullable.'

    def test_crosschema_domain_is_reflected(self):
        metadata = MetaData(testing.db)
        table = Table('crosschema', metadata, autoload=True)
        eq_(str(table.columns.answer.server_default.arg), '0',
            "Reflected default value didn't equal expected value")
        assert table.columns.answer.nullable, \
            'Expected reflected column to be nullable.'

    def test_unknown_types(self):
        from sqlalchemy.databases import postgresql
        ischema_names = postgresql.PGDialect.ischema_names
        postgresql.PGDialect.ischema_names = {}
        try:
            m2 = MetaData(testing.db)
            assert_raises(exc.SAWarning, Table, 'testtable', m2,
                          autoload=True)

            @testing.emits_warning('Did not recognize type')
            def warns():
                m3 = MetaData(testing.db)
                t3 = Table('testtable', m3, autoload=True)
                assert t3.c.answer.type.__class__ == sa.types.NullType
        finally:
            postgresql.PGDialect.ischema_names = ischema_names


class ReflectionTest(fixtures.TestBase):
    __only_on__ = 'postgresql'
    __backend__ = True

    @testing.fails_if("postgresql < 8.4",
                      "Better int2vector functions not available")
    @testing.provide_metadata
    def test_reflected_primary_key_order(self):
        meta1 = self.metadata
        subject = Table('subject', meta1,
                        Column('p1', Integer, primary_key=True),
                        Column('p2', Integer, primary_key=True),
                        PrimaryKeyConstraint('p2', 'p1')
                        )
        meta1.create_all()
        meta2 = MetaData(testing.db)
        subject = Table('subject', meta2, autoload=True)
        eq_(subject.primary_key.columns.keys(), ['p2', 'p1'])

    @testing.provide_metadata
    def test_pg_weirdchar_reflection(self):
        meta1 = self.metadata
        subject = Table('subject', meta1, Column('id$', Integer,
                                                 primary_key=True))
        referer = Table(
            'referer', meta1,
            Column(
                'id', Integer, primary_key=True),
            Column(
                'ref', Integer, ForeignKey('subject.id$')))
        meta1.create_all()
        meta2 = MetaData(testing.db)
        subject = Table('subject', meta2, autoload=True)
        referer = Table('referer', meta2, autoload=True)
        self.assert_((subject.c['id$']
                      == referer.c.ref).compare(
            subject.join(referer).onclause))

    @testing.provide_metadata
    def test_reflect_default_over_128_chars(self):
        Table('t', self.metadata,
              Column('x', String(200), server_default="abcd" * 40)
              ).create(testing.db)

        m = MetaData()
        t = Table('t', m, autoload=True, autoload_with=testing.db)
        eq_(
            t.c.x.server_default.arg.text, "'%s'::character varying" % (
                "abcd" * 40)
        )

    @testing.fails_if("postgresql < 8.1", "schema name leaks in, not sure")
    @testing.provide_metadata
    def test_renamed_sequence_reflection(self):
        metadata = self.metadata
        t = Table('t', metadata, Column('id', Integer, primary_key=True))
        metadata.create_all()
        m2 = MetaData(testing.db)
        t2 = Table('t', m2, autoload=True, implicit_returning=False)
        eq_(t2.c.id.server_default.arg.text,
            "nextval('t_id_seq'::regclass)")
        r = t2.insert().execute()
        eq_(r.inserted_primary_key, [1])
        testing.db.connect().execution_options(autocommit=True).\
            execute('alter table t_id_seq rename to foobar_id_seq'
                    )
        m3 = MetaData(testing.db)
        t3 = Table('t', m3, autoload=True, implicit_returning=False)
        eq_(t3.c.id.server_default.arg.text,
            "nextval('foobar_id_seq'::regclass)")
        r = t3.insert().execute()
        eq_(r.inserted_primary_key, [2])

    @testing.provide_metadata
    def test_renamed_pk_reflection(self):
        metadata = self.metadata
        t = Table('t', metadata, Column('id', Integer, primary_key=True))
        metadata.create_all()
        testing.db.connect().execution_options(autocommit=True).\
            execute('alter table t rename id to t_id')
        m2 = MetaData(testing.db)
        t2 = Table('t', m2, autoload=True)
        eq_([c.name for c in t2.primary_key], ['t_id'])

    @testing.provide_metadata
    def test_has_temporary_table(self):
        assert not testing.db.has_table("some_temp_table")
        user_tmp = Table(
            "some_temp_table", self.metadata,
            Column("id", Integer, primary_key=True),
            Column('name', String(50)),
            prefixes=['TEMPORARY']
        )
        user_tmp.create(testing.db)
        assert testing.db.has_table("some_temp_table")

    @testing.provide_metadata
    def test_cross_schema_reflection_one(self):

        meta1 = self.metadata

        users = Table('users', meta1,
                      Column('user_id', Integer, primary_key=True),
                      Column('user_name', String(30), nullable=False),
                      schema='test_schema')
        addresses = Table(
            'email_addresses', meta1,
            Column(
                'address_id', Integer, primary_key=True),
            Column(
                'remote_user_id', Integer, ForeignKey(
                    users.c.user_id)),
                Column(
                    'email_address', String(20)), schema='test_schema')
        meta1.create_all()
        meta2 = MetaData(testing.db)
        addresses = Table('email_addresses', meta2, autoload=True,
                          schema='test_schema')
        users = Table('users', meta2, mustexist=True,
                      schema='test_schema')
        j = join(users, addresses)
        self.assert_((users.c.user_id
                      == addresses.c.remote_user_id).compare(j.onclause))

    @testing.provide_metadata
    def test_cross_schema_reflection_two(self):
        meta1 = self.metadata
        subject = Table('subject', meta1,
                        Column('id', Integer, primary_key=True))
        referer = Table('referer', meta1,
                        Column('id', Integer, primary_key=True),
                        Column('ref', Integer, ForeignKey('subject.id')),
                        schema='test_schema')
        meta1.create_all()
        meta2 = MetaData(testing.db)
        subject = Table('subject', meta2, autoload=True)
        referer = Table('referer', meta2, schema='test_schema',
                        autoload=True)
        self.assert_((subject.c.id
                      == referer.c.ref).compare(
            subject.join(referer).onclause))

    @testing.provide_metadata
    def test_cross_schema_reflection_three(self):
        meta1 = self.metadata
        subject = Table('subject', meta1,
                        Column('id', Integer, primary_key=True),
                        schema='test_schema_2')
        referer = Table(
            'referer',
            meta1,
            Column(
                'id',
                Integer,
                primary_key=True),
            Column(
                'ref',
                Integer,
                ForeignKey('test_schema_2.subject.id')),
            schema='test_schema')
        meta1.create_all()
        meta2 = MetaData(testing.db)
        subject = Table('subject', meta2, autoload=True,
                        schema='test_schema_2')
        referer = Table('referer', meta2, autoload=True,
                        schema='test_schema')
        self.assert_((subject.c.id
                      == referer.c.ref).compare(
            subject.join(referer).onclause))

    @testing.provide_metadata
    def test_cross_schema_reflection_four(self):
        meta1 = self.metadata
        subject = Table('subject', meta1,
                        Column('id', Integer, primary_key=True),
                        schema='test_schema_2')
        referer = Table(
            'referer',
            meta1,
            Column(
                'id',
                Integer,
                primary_key=True),
            Column(
                'ref',
                Integer,
                ForeignKey('test_schema_2.subject.id')),
            schema='test_schema')
        meta1.create_all()

        conn = testing.db.connect()
        conn.detach()
        conn.execute("SET search_path TO test_schema, test_schema_2")
        meta2 = MetaData(bind=conn)
        subject = Table('subject', meta2, autoload=True,
                        schema='test_schema_2',
                        postgresql_ignore_search_path=True)
        referer = Table('referer', meta2, autoload=True,
                        schema='test_schema',
                        postgresql_ignore_search_path=True)
        self.assert_((subject.c.id
                      == referer.c.ref).compare(
            subject.join(referer).onclause))
        conn.close()

    @testing.provide_metadata
    def test_cross_schema_reflection_five(self):
        meta1 = self.metadata

        # we assume 'public'
        default_schema = testing.db.dialect.default_schema_name
        subject = Table('subject', meta1,
                        Column('id', Integer, primary_key=True))
        referer = Table('referer', meta1,
                        Column('id', Integer, primary_key=True),
                        Column('ref', Integer, ForeignKey('subject.id')))
        meta1.create_all()

        meta2 = MetaData(testing.db)
        subject = Table('subject', meta2, autoload=True,
                        schema=default_schema,
                        postgresql_ignore_search_path=True
                        )
        referer = Table('referer', meta2, autoload=True,
                        schema=default_schema,
                        postgresql_ignore_search_path=True
                        )
        assert subject.schema == default_schema
        self.assert_((subject.c.id
                      == referer.c.ref).compare(
            subject.join(referer).onclause))

    @testing.provide_metadata
    def test_cross_schema_reflection_six(self):
        # test that the search path *is* taken into account
        # by default
        meta1 = self.metadata

        Table('some_table', meta1,
              Column('id', Integer, primary_key=True),
              schema='test_schema'
              )
        Table('some_other_table', meta1,
              Column('id', Integer, primary_key=True),
              Column('sid', Integer, ForeignKey('test_schema.some_table.id')),
              schema='test_schema_2'
              )
        meta1.create_all()
        with testing.db.connect() as conn:
            conn.detach()

            conn.execute(
                "set search_path to test_schema_2, test_schema, public")

            m1 = MetaData(conn)

            t1_schema = Table('some_table',
                              m1,
                              schema="test_schema",
                              autoload=True)
            t2_schema = Table('some_other_table',
                              m1,
                              schema="test_schema_2",
                              autoload=True)

            t2_no_schema = Table('some_other_table',
                                 m1,
                                 autoload=True)

            t1_no_schema = Table('some_table',
                                 m1,
                                 autoload=True)

            m2 = MetaData(conn)
            t1_schema_isp = Table('some_table',
                                  m2,
                                  schema="test_schema",
                                  autoload=True,
                                  postgresql_ignore_search_path=True)
            t2_schema_isp = Table('some_other_table',
                                  m2,
                                  schema="test_schema_2",
                                  autoload=True,
                                  postgresql_ignore_search_path=True)

            # t2_schema refers to t1_schema, but since "test_schema"
            # is in the search path, we instead link to t2_no_schema
            assert t2_schema.c.sid.references(
                t1_no_schema.c.id)

            # the two no_schema tables refer to each other also.
            assert t2_no_schema.c.sid.references(
                t1_no_schema.c.id)

            # but if we're ignoring search path, then we maintain
            # those explicit schemas vs. what the "default" schema is
            assert t2_schema_isp.c.sid.references(t1_schema_isp.c.id)

    @testing.provide_metadata
    def test_cross_schema_reflection_seven(self):
        # test that the search path *is* taken into account
        # by default
        meta1 = self.metadata

        Table('some_table', meta1,
              Column('id', Integer, primary_key=True),
              schema='test_schema'
              )
        Table('some_other_table', meta1,
              Column('id', Integer, primary_key=True),
              Column('sid', Integer, ForeignKey('test_schema.some_table.id')),
              schema='test_schema_2'
              )
        meta1.create_all()
        with testing.db.connect() as conn:
            conn.detach()

            conn.execute(
                "set search_path to test_schema_2, test_schema, public")
            meta2 = MetaData(conn)
            meta2.reflect(schema="test_schema_2")

            eq_(set(meta2.tables), set(
                ['test_schema_2.some_other_table', 'some_table']))

            meta3 = MetaData(conn)
            meta3.reflect(
                schema="test_schema_2", postgresql_ignore_search_path=True)

            eq_(set(meta3.tables), set(
                ['test_schema_2.some_other_table', 'test_schema.some_table']))

    @testing.provide_metadata
    def test_cross_schema_reflection_metadata_uses_schema(self):
        # test [ticket:3716]

        metadata = self.metadata

        Table('some_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('sid', Integer, ForeignKey('some_other_table.id')),
              schema='test_schema'
              )
        Table('some_other_table', metadata,
              Column('id', Integer, primary_key=True),
              schema=None
              )
        metadata.create_all()
        with testing.db.connect() as conn:
            meta2 = MetaData(conn, schema="test_schema")
            meta2.reflect()

            eq_(set(meta2.tables), set(
                ['some_other_table', 'test_schema.some_table']))

    @testing.provide_metadata
    def test_uppercase_lowercase_table(self):
        metadata = self.metadata

        a_table = Table('a', metadata, Column('x', Integer))
        A_table = Table('A', metadata, Column('x', Integer))

        a_table.create()
        assert testing.db.has_table("a")
        assert not testing.db.has_table("A")
        A_table.create(checkfirst=True)
        assert testing.db.has_table("A")

    def test_uppercase_lowercase_sequence(self):

        a_seq = Sequence('a')
        A_seq = Sequence('A')

        a_seq.create(testing.db)
        assert testing.db.dialect.has_sequence(testing.db, "a")
        assert not testing.db.dialect.has_sequence(testing.db, "A")
        A_seq.create(testing.db, checkfirst=True)
        assert testing.db.dialect.has_sequence(testing.db, "A")

        a_seq.drop(testing.db)
        A_seq.drop(testing.db)

    @testing.provide_metadata
    def test_index_reflection(self):
        """ Reflecting partial & expression-based indexes should warn
        """

        metadata = self.metadata

        t1 = Table(
            'party', metadata,
            Column(
                'id', String(10), nullable=False),
            Column(
                'name', String(20), index=True),
            Column(
                'aname', String(20)))
        metadata.create_all()
        testing.db.execute("""
          create index idx1 on party ((id || name))
        """)
        testing.db.execute("""
          create unique index idx2 on party (id) where name = 'test'
        """)
        testing.db.execute("""
            create index idx3 on party using btree
                (lower(name::text), lower(aname::text))
        """)

        def go():
            m2 = MetaData(testing.db)
            t2 = Table('party', m2, autoload=True)
            assert len(t2.indexes) == 2

            # Make sure indexes are in the order we expect them in

            tmp = [(idx.name, idx) for idx in t2.indexes]
            tmp.sort()
            r1, r2 = [idx[1] for idx in tmp]
            assert r1.name == 'idx2'
            assert r1.unique == True
            assert r2.unique == False
            assert [t2.c.id] == r1.columns
            assert [t2.c.name] == r2.columns

        testing.assert_warnings(
            go,
            ['Skipped unsupported reflection of '
             'expression-based index idx1',
             'Predicate of partial index idx2 ignored during '
             'reflection',
             'Skipped unsupported reflection of '
             'expression-based index idx3'])

    @testing.provide_metadata
    def test_index_reflection_modified(self):
        """reflect indexes when a column name has changed - PG 9
        does not update the name of the column in the index def.
        [ticket:2141]

        """

        metadata = self.metadata

        t1 = Table('t', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('x', Integer)
                   )
        metadata.create_all()
        conn = testing.db.connect().execution_options(autocommit=True)
        conn.execute("CREATE INDEX idx1 ON t (x)")
        conn.execute("ALTER TABLE t RENAME COLUMN x to y")

        ind = testing.db.dialect.get_indexes(conn, "t", None)
        eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}])
        conn.close()

    @testing.fails_if("postgresql < 8.2", "reloptions not supported")
    @testing.provide_metadata
    def test_index_reflection_with_storage_options(self):
        """reflect indexes with storage options set"""

        metadata = self.metadata

        Table(
            't', metadata,
            Column('id', Integer, primary_key=True),
            Column('x', Integer)
        )
        metadata.create_all()

        with testing.db.connect().execution_options(autocommit=True) as conn:
            conn.execute("CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)")

            ind = testing.db.dialect.get_indexes(conn, "t", None)
            eq_(ind, [{'unique': False, 'column_names': ['x'], 'name': 'idx1',
                       'dialect_options':
                       {"postgresql_with": {"fillfactor": "50"}}}])

            m = MetaData()
            t1 = Table('t', m, autoload_with=conn)
            eq_(
                list(t1.indexes)[0].dialect_options['postgresql']['with'],
                {"fillfactor": "50"}
            )

    @testing.provide_metadata
    def test_index_reflection_with_access_method(self):
        """reflect indexes with storage options set"""

        metadata = self.metadata

        Table(
            't', metadata,
            Column('id', Integer, primary_key=True),
            Column('x', ARRAY(Integer))
        )
        metadata.create_all()
        with testing.db.connect().execution_options(autocommit=True) as conn:
            conn.execute("CREATE INDEX idx1 ON t USING gin (x)")

            ind = testing.db.dialect.get_indexes(conn, "t", None)
            eq_(ind, [{'unique': False, 'column_names': ['x'], 'name': 'idx1',
                       'dialect_options': {'postgresql_using': 'gin'}}])
            m = MetaData()
            t1 = Table('t', m, autoload_with=conn)
            eq_(
                list(t1.indexes)[0].dialect_options['postgresql']['using'],
                'gin'
            )

    @testing.provide_metadata
    def test_foreign_key_option_inspection(self):
        metadata = self.metadata
        Table(
            'person',
            metadata,
            Column(
                'id',
                String(
                    length=32),
                nullable=False,
                primary_key=True),
            Column(
                'company_id',
                ForeignKey(
                    'company.id',
                    name='person_company_id_fkey',
                    match='FULL',
                    onupdate='RESTRICT',
                    ondelete='RESTRICT',
                    deferrable=True,
                    initially='DEFERRED')))
        Table(
            'company', metadata,
            Column('id', String(length=32), nullable=False, primary_key=True),
            Column('name', String(length=255)),
            Column(
                'industry_id',
                ForeignKey(
                    'industry.id',
                    name='company_industry_id_fkey',
                    onupdate='CASCADE', ondelete='CASCADE',
                    deferrable=False,  # PG default
                    # PG default
                    initially='IMMEDIATE'
                )
            )
        )
        Table('industry', metadata,
              Column('id', Integer(), nullable=False, primary_key=True),
              Column('name', String(length=255))
              )
        fk_ref = {
            'person_company_id_fkey': {
                'name': 'person_company_id_fkey',
                'constrained_columns': ['company_id'],
                'referred_columns': ['id'],
                'referred_table': 'company',
                'referred_schema': None,
                'options': {
                    'onupdate': 'RESTRICT',
                    'deferrable': True,
                    'ondelete': 'RESTRICT',
                    'initially': 'DEFERRED',
                    'match': 'FULL'
                }
            },
            'company_industry_id_fkey': {
                'name': 'company_industry_id_fkey',
                'constrained_columns': ['industry_id'],
                'referred_columns': ['id'],
                'referred_table': 'industry',
                'referred_schema': None,
                'options': {
                    'onupdate': 'CASCADE',
                    'deferrable': None,
                    'ondelete': 'CASCADE',
                    'initially': None,
                    'match': None
                }
            }
        }
        metadata.create_all()
        inspector = inspect(testing.db)
        fks = inspector.get_foreign_keys('person') + \
            inspector.get_foreign_keys('company')
        for fk in fks:
            eq_(fk, fk_ref[fk['name']])

    @testing.provide_metadata
    def test_inspect_enums_schema(self):
        conn = testing.db.connect()
        enum_type = postgresql.ENUM(
            'sad', 'ok', 'happy', name='mood',
            schema='test_schema',
            metadata=self.metadata)
        enum_type.create(conn)
        inspector = reflection.Inspector.from_engine(conn.engine)
        eq_(
            inspector.get_enums('test_schema'), [{
                'visible': False,
                'name': 'mood',
                'schema': 'test_schema',
                'labels': ['sad', 'ok', 'happy']
            }])

    @testing.provide_metadata
    def test_inspect_enums(self):
        enum_type = postgresql.ENUM(
            'cat', 'dog', 'rat', name='pet', metadata=self.metadata)
        enum_type.create(testing.db)
        inspector = reflection.Inspector.from_engine(testing.db)
        eq_(inspector.get_enums(), [
            {
                'visible': True,
                'labels': ['cat', 'dog', 'rat'],
                'name': 'pet',
                'schema': 'public'
            }])

    @testing.provide_metadata
    def test_inspect_enums_star(self):
        enum_type = postgresql.ENUM(
            'cat', 'dog', 'rat', name='pet', metadata=self.metadata)
        schema_enum_type = postgresql.ENUM(
            'sad', 'ok', 'happy', name='mood',
            schema='test_schema',
            metadata=self.metadata)
        enum_type.create(testing.db)
        schema_enum_type.create(testing.db)
        inspector = reflection.Inspector.from_engine(testing.db)

        eq_(inspector.get_enums(), [
            {
                'visible': True,
                'labels': ['cat', 'dog', 'rat'],
                'name': 'pet',
                'schema': 'public'
            }])

        eq_(inspector.get_enums('*'), [
            {
                'visible': True,
                'labels': ['cat', 'dog', 'rat'],
                'name': 'pet',
                'schema': 'public'
            },
            {
                'visible': False,
                'name': 'mood',
                'schema': 'test_schema',
                'labels': ['sad', 'ok', 'happy']
            }])

    @testing.provide_metadata
    @testing.only_on("postgresql >= 8.5")
    def test_reflection_with_unique_constraint(self):
        insp = inspect(testing.db)

        meta = self.metadata
        uc_table = Table('pgsql_uc', meta,
                         Column('a', String(10)),
                         UniqueConstraint('a', name='uc_a'))

        uc_table.create()

        # PostgreSQL will create an implicit index for a unique
        # constraint.   Separately we get both
        indexes = set(i['name'] for i in insp.get_indexes('pgsql_uc'))
        constraints = set(i['name']
                          for i in insp.get_unique_constraints('pgsql_uc'))

        self.assert_('uc_a' in indexes)
        self.assert_('uc_a' in constraints)

        # reflection corrects for the dupe
        reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)

        indexes = set(i.name for i in reflected.indexes)
        constraints = set(uc.name for uc in reflected.constraints)

        self.assert_('uc_a' not in indexes)
        self.assert_('uc_a' in constraints)

    @testing.provide_metadata
    def test_reflect_unique_index(self):
        insp = inspect(testing.db)

        meta = self.metadata

        # a unique index OTOH we are able to detect is an index
        # and not a unique constraint
        uc_table = Table('pgsql_uc', meta,
                         Column('a', String(10)),
                         Index('ix_a', 'a', unique=True))

        uc_table.create()

        indexes = dict((i['name'], i) for i in insp.get_indexes('pgsql_uc'))
        constraints = set(i['name']
                          for i in insp.get_unique_constraints('pgsql_uc'))

        self.assert_('ix_a' in indexes)
        assert indexes['ix_a']['unique']
        self.assert_('ix_a' not in constraints)

        reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)

        indexes = dict((i.name, i) for i in reflected.indexes)
        constraints = set(uc.name for uc in reflected.constraints)

        self.assert_('ix_a' in indexes)
        assert indexes['ix_a'].unique
        self.assert_('ix_a' not in constraints)


class CustomTypeReflectionTest(fixtures.TestBase):

    class CustomType(object):

        def __init__(self, arg1=None, arg2=None):
            self.arg1 = arg1
            self.arg2 = arg2

    ischema_names = None

    def setup(self):
        ischema_names = postgresql.PGDialect.ischema_names
        postgresql.PGDialect.ischema_names = ischema_names.copy()
        self.ischema_names = ischema_names

    def teardown(self):
        postgresql.PGDialect.ischema_names = self.ischema_names
        self.ischema_names = None

    def _assert_reflected(self, dialect):
        for sch, args in [
            ('my_custom_type', (None, None)),
            ('my_custom_type()', (None, None)),
            ('my_custom_type(ARG1)', ('ARG1', None)),
            ('my_custom_type(ARG1, ARG2)', ('ARG1', 'ARG2')),
        ]:
            column_info = dialect._get_column_info(
                'colname', sch, None, False,
                {}, {}, 'public')
            assert isinstance(column_info['type'], self.CustomType)
            eq_(column_info['type'].arg1, args[0])
            eq_(column_info['type'].arg2, args[1])

    def test_clslevel(self):
        postgresql.PGDialect.ischema_names['my_custom_type'] = self.CustomType
        dialect = postgresql.PGDialect()
        self._assert_reflected(dialect)

    def test_instancelevel(self):
        dialect = postgresql.PGDialect()
        dialect.ischema_names = dialect.ischema_names.copy()
        dialect.ischema_names['my_custom_type'] = self.CustomType
        self._assert_reflected(dialect)
