1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import eq_
class OnDuplicateTest(fixtures.TablesTest):
__only_on__ = ("mysql", "mariadb")
__backend__ = True
run_define_tables = "each"
@classmethod
def define_tables(cls, metadata):
Table(
"foos",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("bar", String(10)),
Column("baz", String(10)),
Column("updated_once", Boolean, default=False),
)
def test_bad_args(self):
assert_raises(
ValueError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
{"id": 1, "bar": "b"},
id=1,
bar="b",
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos).values({}).on_duplicate_key_update,
{"id": 1, "bar": "b"},
{"id": 2, "bar": "baz"},
)
def test_on_duplicate_key_update_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
# multirow, so its ambiguous. this is a behavioral change
# in 1.4
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab", "bz", False)],
)
def test_on_duplicate_key_update_singlerow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values(dict(id=2, bar="b"))
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
# only one row in the INSERT so we do inserted_primary_key
eq_(result.inserted_primary_key, (2,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "b", "bz", False)],
)
def test_on_duplicate_key_update_null_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(updated_once=None)
result = conn.execute(stmt)
# ambiguous
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "b", "bz", None)],
)
def test_on_duplicate_key_update_expression_multirow(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(insert(foos).values(dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
stmt = stmt.on_duplicate_key_update(
bar=func.concat(stmt.inserted.bar, "_foo"),
baz=func.concat(stmt.inserted.bar, "_", foos.c.baz),
)
result = conn.execute(stmt)
eq_(result.inserted_primary_key, (None,))
eq_(
conn.execute(foos.select()).fetchall(),
[
# first entry triggers ON DUPLICATE
(1, "ab_foo", "ab_bz", False),
# second entry must be an insert
(2, "b", None, False),
],
)
def test_on_duplicate_key_update_preserve_order(self, connection):
foos = self.tables.foos
conn = connection
conn.execute(
insert(foos).values(
[
dict(id=1, bar="b", baz="bz"),
dict(id=2, bar="b", baz="bz2"),
],
)
)
stmt = insert(foos)
update_condition = foos.c.updated_once == False
# The following statements show importance of the columns update
# ordering as old values being referenced in UPDATE clause are
# getting replaced one by one from left to right with their new
# values.
stmt1 = stmt.on_duplicate_key_update(
[
(
"bar",
func.if_(
update_condition,
func.values(foos.c.bar),
foos.c.bar,
),
),
(
"updated_once",
func.if_(update_condition, True, foos.c.updated_once),
),
]
)
stmt2 = stmt.on_duplicate_key_update(
[
(
"updated_once",
func.if_(update_condition, True, foos.c.updated_once),
),
(
"bar",
func.if_(
update_condition,
func.values(foos.c.bar),
foos.c.bar,
),
),
]
)
# First statement should succeed updating column bar
conn.execute(stmt1, dict(id=1, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab", "bz", True)],
)
# Second statement will do noop update of column bar
conn.execute(stmt2, dict(id=2, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
[(2, "b", "bz2", True)],
)
def test_last_inserted_id(self, connection):
foos = self.tables.foos
conn = connection
stmt = insert(foos).values({"bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, (1,))
stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, (1,))
|