from test.lib.testing import assert_raises, assert_raises_message, eq_
from test.lib.engines import testing_engine
from test.lib import fixtures, AssertsCompiledSQL, testing
from sqlalchemy import *
from sqlalchemy import exc as exceptions
from sqlalchemy.engine import default
from sqlalchemy.sql import table, column
from test.lib.schema import Table, Column

IDENT_LENGTH = 29

class LabelTypeTest(fixtures.TestBase):
    def test_type(self):
        m = MetaData()
        t = Table('sometable', m,
            Column('col1', Integer),
            Column('col2', Float))
        assert isinstance(t.c.col1.label('hi').type, Integer)
        assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
                    Float)

class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
    run_inserts = 'once'
    run_deletes = None

    @classmethod
    def define_tables(cls, metadata):
        table1 = Table("some_large_named_table", metadata,
            Column("this_is_the_primarykey_column", Integer,
                            primary_key=True, 
                            test_needs_autoincrement=True),
            Column("this_is_the_data_column", String(30))
            )

        table2 = Table("table_with_exactly_29_characs", metadata,
            Column("this_is_the_primarykey_column", Integer,
                            primary_key=True, 
                            test_needs_autoincrement=True),
            Column("this_is_the_data_column", String(30))
            )
        cls.tables.table1 = table1
        cls.tables.table2 = table2

    @classmethod
    def insert_data(cls):
        table1 = cls.tables.table1
        table2 = cls.tables.table2
        for data in [
            {"this_is_the_primarykey_column":1, 
                        "this_is_the_data_column":"data1"},
            {"this_is_the_primarykey_column":2, 
                        "this_is_the_data_column":"data2"},
            {"this_is_the_primarykey_column":3, 
                        "this_is_the_data_column":"data3"},
            {"this_is_the_primarykey_column":4, 
                        "this_is_the_data_column":"data4"}
        ]:
            testing.db.execute(
                table1.insert(),
                **data
            )
        testing.db.execute(
            table2.insert(),
            {"this_is_the_primary_key_column":1, 
            "this_is_the_data_column":"data"}
        )

    @classmethod
    def setup_class(cls):
        super(LongLabelsTest, cls).setup_class()
        cls.maxlen = testing.db.dialect.max_identifier_length
        testing.db.dialect.max_identifier_length = IDENT_LENGTH

    @classmethod
    def teardown_class(cls):
        testing.db.dialect.max_identifier_length = cls.maxlen
        super(LongLabelsTest, cls).teardown_class()

    def test_too_long_name_disallowed(self):
        m = MetaData(testing.db)
        t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test", 
                        m, Column('foo', Integer))
        assert_raises(exceptions.IdentifierError, m.create_all)
        assert_raises(exceptions.IdentifierError, m.drop_all)
        assert_raises(exceptions.IdentifierError, t1.create)
        assert_raises(exceptions.IdentifierError, t1.drop)

    def test_basic_result(self):
        table1 = self.tables.table1
        s = table1.select(use_labels=True, 
                        order_by=[table1.c.this_is_the_primarykey_column])

        result = [
            (row[table1.c.this_is_the_primarykey_column], 
            row[table1.c.this_is_the_data_column])
            for row in testing.db.execute(s)
        ]
        eq_(result, [
            (1, "data1"),
            (2, "data2"),
            (3, "data3"),
            (4, "data4"),
        ])

    def test_result_limit(self):
        table1 = self.tables.table1
        # some dialects such as oracle (and possibly ms-sql 
        # in a future version)
        # generate a subquery for limits/offsets.
        # ensure that the generated result map corresponds 
        # to the selected table, not
        # the select query
        s = table1.select(use_labels=True, 
                        order_by=[table1.c.this_is_the_primarykey_column]).\
                        limit(2)

        result = [
            (row[table1.c.this_is_the_primarykey_column], 
            row[table1.c.this_is_the_data_column])
            for row in testing.db.execute(s)
        ]
        eq_(result, [
            (1, "data1"),
            (2, "data2"),
        ])

    @testing.requires.offset
    def test_result_limit_offset(self):
        table1 = self.tables.table1
        s = table1.select(use_labels=True, 
                        order_by=[table1.c.this_is_the_primarykey_column]).\
                        limit(2).offset(1)

        result = [
            (row[table1.c.this_is_the_primarykey_column], 
            row[table1.c.this_is_the_data_column])
            for row in testing.db.execute(s)
        ]
        eq_(result, [
            (2, "data2"),
            (3, "data3"),
        ])

    def test_table_alias_1(self):
        table2 = self.tables.table2
        if testing.against('oracle'):
            self.assert_compile(
                table2.alias().select(),
                "SELECT table_with_exactly_29_c_1."
                "this_is_the_primarykey_column, "
                "table_with_exactly_29_c_1.this_is_the_data_column "
                "FROM table_with_exactly_29_characs "
                "table_with_exactly_29_c_1"
            )
        else:
            self.assert_compile(
                table2.alias().select(),
                "SELECT table_with_exactly_29_c_1."
                "this_is_the_primarykey_column, "
                "table_with_exactly_29_c_1.this_is_the_data_column "
                "FROM table_with_exactly_29_characs AS "
                "table_with_exactly_29_c_1"
            )

    def test_table_alias_2(self):
        table1 = self.tables.table1
        table2 = self.tables.table2
        ta = table2.alias()
        dialect = default.DefaultDialect()
        dialect.max_identifier_length = IDENT_LENGTH
        self.assert_compile(
            select([table1, ta]).select_from(
                        table1.join(ta, 
                            table1.c.this_is_the_data_column==
                            ta.c.this_is_the_data_column)).\
                        where(ta.c.this_is_the_data_column=='data3'),

            "SELECT some_large_named_table.this_is_the_primarykey_column, "
            "some_large_named_table.this_is_the_data_column, "
            "table_with_exactly_29_c_1.this_is_the_primarykey_column, "
            "table_with_exactly_29_c_1.this_is_the_data_column FROM "
            "some_large_named_table JOIN table_with_exactly_29_characs "
            "AS table_with_exactly_29_c_1 ON "
            "some_large_named_table.this_is_the_data_column = "
            "table_with_exactly_29_c_1.this_is_the_data_column "
            "WHERE table_with_exactly_29_c_1.this_is_the_data_column = "
            ":this_is_the_data_column_1",
            dialect=dialect
        )

    def test_table_alias_3(self):
        table2 = self.tables.table2
        eq_(
            testing.db.execute(table2.alias().select()).first(),
            (1, "data")
        )

    def test_colbinds(self):
        table1 = self.tables.table1
        r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
                    execute()
        assert r.fetchall() == [(4, "data4")]

        r = table1.select(or_(
            table1.c.this_is_the_primarykey_column == 4,
            table1.c.this_is_the_primarykey_column == 2
        )).execute()
        assert r.fetchall() == [(2, "data2"), (4, "data4")]

    @testing.provide_metadata
    def test_insert_no_pk(self):
        t = Table("some_other_large_named_table", self.metadata,
            Column("this_is_the_primarykey_column", Integer, 
                            Sequence("this_is_some_large_seq"), 
                            primary_key=True),
            Column("this_is_the_data_column", String(30))
            )
        t.create(testing.db, checkfirst=True)
        testing.db.execute(t.insert(), 
                **{"this_is_the_data_column":"data1"})

    @testing.requires.subqueries
    def test_subquery(self):
        table1 = self.tables.table1
        q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
                        alias('foo')
        eq_(
            list(testing.db.execute(select([q]))),
            [(4, u'data4')]
        )

    @testing.requires.subqueries
    def test_anon_alias(self):
        table1 = self.tables.table1
        compile_dialect = default.DefaultDialect()
        compile_dialect.max_identifier_length = IDENT_LENGTH

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
        x = select([q], use_labels=True)

        self.assert_compile(x, 
            "SELECT anon_1.this_is_the_primarykey_column AS "
            "anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column "
            "AS anon_1_this_is_the_data_2 "
            "FROM (SELECT some_large_named_table."
            "this_is_the_primarykey_column AS "
            "this_is_the_primarykey_column, "
            "some_large_named_table.this_is_the_data_column "
            "AS this_is_the_data_column "
            "FROM some_large_named_table "
            "WHERE some_large_named_table.this_is_the_primarykey_column "
            "= :this_is_the_primarykey__1) AS anon_1", 
            dialect=compile_dialect)

        eq_(
            list(testing.db.execute(x)),
            [(4, u'data4')]
        )

    def test_adjustable(self):
        table1 = self.tables.table1

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
        x = select([q])

        compile_dialect = default.DefaultDialect(label_length=10)
        self.assert_compile(x, 
            "SELECT foo.this_1, foo.this_2 FROM "
            "(SELECT some_large_named_table."
            "this_is_the_primarykey_column AS this_1, "
            "some_large_named_table.this_is_the_data_column AS this_2 "
            "FROM some_large_named_table WHERE "
            "some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo", 
            dialect=compile_dialect)

        compile_dialect = default.DefaultDialect(label_length=4)
        self.assert_compile(x, "SELECT foo._1, foo._2 FROM "
            "(SELECT some_large_named_table.this_is_the_primarykey_column "
            "AS _1, some_large_named_table.this_is_the_data_column AS _2 "
            "FROM some_large_named_table WHERE "
            "some_large_named_table.this_is_the_primarykey_column = :_1) AS foo", 
        dialect=compile_dialect)

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
        x = select([q], use_labels=True)

        compile_dialect = default.DefaultDialect(label_length=10)
        self.assert_compile(x, 
            "SELECT anon_1.this_2 AS anon_1, anon_1.this_4 AS anon_3 FROM "
            "(SELECT some_large_named_table.this_is_the_primarykey_column "
            "AS this_2, some_large_named_table.this_is_the_data_column AS this_4 "
            "FROM some_large_named_table WHERE "
            "some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1", 
            dialect=compile_dialect)

        compile_dialect = default.DefaultDialect(label_length=4)
        self.assert_compile(x, "SELECT _1._2 AS _1, _1._4 AS _3 FROM "
            "(SELECT some_large_named_table.this_is_the_primarykey_column "
            "AS _2, some_large_named_table.this_is_the_data_column AS _4 "
            "FROM some_large_named_table WHERE "
            "some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", 
            dialect=compile_dialect)

    def test_adjustable_result_schema_column(self):
        table1 = self.tables.table1

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
        x = select([q])

        e = testing_engine(options={"label_length":10})
        e.pool = testing.db.pool
        row = e.execute(x).first()
        eq_(row.this_is_the_primarykey_column, 4)
        eq_(row.this_1, 4)
        eq_(row['this_1'], 4)

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
        row = e.execute(x).first()
        eq_(row.this_is_the_primarykey_column, 4)
        eq_(row.this_1, 4)

    def test_adjustable_result_lightweight_column(self):

        table1 = table("some_large_named_table", 
            column("this_is_the_primarykey_column"),
            column("this_is_the_data_column")
        )

        q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
        x = select([q])

        e = testing_engine(options={"label_length":10})
        e.pool = testing.db.pool
        row = e.execute(x).first()
        eq_(row.this_is_the_primarykey_column, 4)
        eq_(row.this_1, 4)

    def test_table_plus_column_exceeds_length(self):
        """test that the truncation occurs if tablename / colname are only
        greater than the max when concatenated."""

        compile_dialect = default.DefaultDialect(label_length=30)
        m = MetaData()
        a_table = Table(
            'thirty_characters_table_xxxxxx',
            m,
            Column('id', Integer, primary_key=True)
        )

        other_table = Table(
            'other_thirty_characters_table_',
            m,
            Column('id', Integer, primary_key=True),
            Column('thirty_characters_table_id',
                Integer,
                ForeignKey('thirty_characters_table_xxxxxx.id'),
                primary_key=True
            )
        )

        anon = a_table.alias()
        self.assert_compile(
            select([other_table,anon]).
                            select_from(
                                other_table.outerjoin(anon)
                        ).apply_labels(),
            "SELECT other_thirty_characters_table_.id AS "
            "other_thirty_characters__1, "
            "other_thirty_characters_table_.thirty_characters_table_id "
            "AS other_thirty_characters__2, thirty_characters_table__1.id "
            "AS thirty_characters_table__3 "
            "FROM other_thirty_characters_table_ "
            "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
            "AS thirty_characters_table__1 ON "
            "thirty_characters_table__1.id = "
            "other_thirty_characters_table_.thirty_characters_table_id",
            dialect=compile_dialect)

        self.assert_compile(
                select([other_table, anon]).
                    select_from(
                                other_table.outerjoin(anon)
                    ).apply_labels(),
            "SELECT other_thirty_characters_table_.id AS "
            "other_thirty_characters__1, "
            "other_thirty_characters_table_.thirty_characters_table_id "
            "AS other_thirty_characters__2, "
            "thirty_characters_table__1.id AS thirty_characters_table__3 "
            "FROM other_thirty_characters_table_ "
            "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
            "AS thirty_characters_table__1 ON "
            "thirty_characters_table__1.id = "
            "other_thirty_characters_table_.thirty_characters_table_id",
            dialect=compile_dialect
        )
