File: test_returning.py

package info (click to toggle)
sqlalchemy 0.6.3-3%2Bsqueeze1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 10,744 kB
  • ctags: 15,132
  • sloc: python: 93,431; ansic: 787; makefile: 137; xml: 17
file content (193 lines) | stat: -rw-r--r-- 7,739 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from sqlalchemy.test.testing import eq_
from sqlalchemy import *
from sqlalchemy.test import *
from sqlalchemy.test.schema import Table, Column
from sqlalchemy.types import TypeDecorator

class ReturningTest(TestBase, AssertsExecutionResults):
    __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')

    def setup(self):
        meta = MetaData(testing.db)
        global table, GoofyType
        
        class GoofyType(TypeDecorator):
            impl = String
            
            def process_bind_param(self, value, dialect):
                if value is None:
                    return None
                return "FOO" + value

            def process_result_value(self, value, dialect):
                if value is None:
                    return None
                return value + "BAR"
            
        table = Table('tables', meta,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('persons', Integer),
            Column('full', Boolean),
            Column('goofy', GoofyType(50))
        )
        table.create(checkfirst=True)
    
    def teardown(self):
        table.drop()
    
    @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_column_targeting(self):
        result = table.insert().returning(table.c.id, table.c.full).execute({'persons': 1, 'full': False})
        
        row = result.first()
        assert row[table.c.id] == row['id'] == 1
        assert row[table.c.full] == row['full'] == False
        
        result = table.insert().values(persons=5, full=True, goofy="somegoofy").\
                            returning(table.c.persons, table.c.full, table.c.goofy).execute()
        row = result.first()
        assert row[table.c.persons] == row['persons'] == 5
        assert row[table.c.full] == row['full'] == True

        eq_(row[table.c.goofy], row['goofy'])
        eq_(row['goofy'], "FOOsomegoofyBAR")
    
    @testing.fails_on('firebird', "fb can't handle returning x AS y")
    @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_labeling(self):
        result = table.insert().values(persons=6).\
                            returning(table.c.persons.label('lala')).execute()
        row = result.first()
        assert row['lala'] == 6

    @testing.fails_on('firebird', "fb/kintersbasdb can't handle the bind params")
    @testing.fails_on('oracle+zxjdbc', "JDBC driver bug")
    @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_anon_expressions(self):
        result = table.insert().values(goofy="someOTHERgoofy").\
                            returning(func.lower(table.c.goofy, type_=GoofyType)).execute()
        row = result.first()
        assert row[0] == "foosomeothergoofyBAR"

        result = table.insert().values(persons=12).\
                            returning(table.c.persons + 18).execute()
        row = result.first()
        assert row[0] == 30
        
    @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_update_returning(self):
        table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])

        result = table.update(table.c.persons > 4, dict(full=True)).returning(table.c.id).execute()
        eq_(result.fetchall(), [(1,)])

        result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
        eq_(result2.fetchall(), [(1,True),(2,False)])

    @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_insert_returning(self):
        result = table.insert().returning(table.c.id).execute({'persons': 1, 'full': False})

        eq_(result.fetchall(), [(1,)])

        @testing.fails_on('postgresql', '')
        @testing.fails_on('oracle+cx_oracle', '')
        @testing.crashes('mssql+mxodbc', 'Raises an error')
        def test_executemany():
            # return value is documented as failing with psycopg2/executemany
            result2 = table.insert().returning(table).execute(
                 [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])

            if testing.against('mssql+zxjdbc'):
                # jtds apparently returns only the first row
                eq_(result2.fetchall(), [(2, 2, False, None)])
            elif testing.against('firebird', 'mssql', 'oracle'):
                # Multiple inserts only return the last row
                eq_(result2.fetchall(), [(3, 3, True, None)])
            else:
                # nobody does this as far as we know (pg8000?)
                eq_(result2.fetchall(), [(2, 2, False, None), (3, 3, True, None)])

        test_executemany()

    
        
    @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    @testing.fails_on_everything_except('postgresql', 'firebird')
    def test_literal_returning(self):
        if testing.against("postgresql"):
            literal_true = "true"
        else:
            literal_true = "1"

        result4 = testing.db.execute('insert into tables (id, persons, "full") '
                                        'values (5, 10, %s) returning persons' % literal_true)
        eq_([dict(row) for row in result4], [{'persons': 10}])

    @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_delete_returning(self):
        table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])

        result = table.delete(table.c.persons > 4).returning(table.c.id).execute()
        eq_(result.fetchall(), [(1,)])

        result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
        eq_(result2.fetchall(), [(2,False),])

class SequenceReturningTest(TestBase):
    __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'mssql')

    def setup(self):
        meta = MetaData(testing.db)
        global table, seq
        seq = Sequence('tid_seq')
        table = Table('tables', meta,
                    Column('id', Integer, seq, primary_key=True),
                    Column('data', String(50))
                )
        table.create(checkfirst=True)

    def teardown(self):
        table.drop()

    def test_insert(self):
        r = table.insert().values(data='hi').returning(table.c.id).execute()
        assert r.first() == (1, )
        assert seq.execute() == 2

class KeyReturningTest(TestBase, AssertsExecutionResults):
    """test returning() works with columns that define 'key'."""
    
    __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')

    def setup(self):
        meta = MetaData(testing.db)
        global table

        table = Table('tables', meta,
            Column('id', Integer, primary_key=True, key='foo_id', test_needs_autoincrement=True),
            Column('data', String(20)),
        )
        table.create(checkfirst=True)

    def teardown(self):
        table.drop()

    @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
    @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature')
    def test_insert(self):
        result = table.insert().returning(table.c.foo_id).execute(data='somedata')
        row = result.first()
        assert row[table.c.foo_id] == row['id'] == 1
        
        result = table.select().execute().first()
        assert row[table.c.foo_id] == row['id'] == 1