1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
from sqlalchemy import exc
from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy.orm import aliased
from sqlalchemy.orm import loading
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import eq_
from sqlalchemy.util import KeyedTuple
from . import _fixtures
# class GetFromIdentityTest(_fixtures.FixtureTest):
# class LoadOnIdentTest(_fixtures.FixtureTest):
class InstanceProcessorTest(_fixtures.FixtureTest):
def test_state_no_load_path_comparison(self):
# test issue #5110
User, Order, Address = self.classes("User", "Order", "Address")
users, orders, addresses = self.tables("users", "orders", "addresses")
mapper(
User,
users,
properties={
"addresses": relationship(Address, lazy="joined"),
"orders": relationship(
Order, lazy="joined", order_by=orders.c.id
),
},
)
mapper(
Order,
orders,
properties={"address": relationship(Address, lazy="joined")},
)
mapper(Address, addresses)
s = Session()
def go():
eq_(
User(
id=7,
orders=[
Order(id=1, address=Address(id=1)),
Order(id=3, address=Address(id=1)),
Order(id=5, address=None),
],
),
s.query(User).populate_existing().get(7),
)
self.assert_sql_count(testing.db, go, 1)
class InstancesTest(_fixtures.FixtureTest):
run_setup_mappers = "once"
run_inserts = "once"
run_deletes = None
@classmethod
def setup_mappers(cls):
cls._setup_stock_mapping()
def test_cursor_close_w_failed_rowproc(self):
User = self.classes.User
s = Session()
q = s.query(User)
ctx = q._compile_context()
cursor = mock.Mock()
q._entities = [
mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
]
assert_raises(Exception, list, loading.instances(q, cursor, ctx))
assert cursor.close.called, "Cursor wasn't closed"
def test_row_proc_not_created(self):
User = self.classes.User
s = Session()
q = s.query(User.id, User.name)
stmt = select([User.id])
assert_raises_message(
exc.NoSuchColumnError,
"Could not locate column in row for column 'users.name'",
q.from_statement(stmt).all,
)
class MergeResultTest(_fixtures.FixtureTest):
run_setup_mappers = "once"
run_inserts = "once"
run_deletes = None
@classmethod
def setup_mappers(cls):
cls._setup_stock_mapping()
def _fixture(self):
User = self.classes.User
s = Session()
u1, u2, u3, u4 = (
User(id=1, name="u1"),
User(id=2, name="u2"),
User(id=7, name="u3"),
User(id=8, name="u4"),
)
s.query(User).filter(User.id.in_([7, 8])).all()
s.close()
return s, [u1, u2, u3, u4]
def test_single_entity(self):
s, (u1, u2, u3, u4) = self._fixture()
User = self.classes.User
q = s.query(User)
collection = [u1, u2, u3, u4]
it = loading.merge_result(q, collection)
eq_([x.id for x in it], [1, 2, 7, 8])
def test_single_column(self):
User = self.classes.User
s = Session()
q = s.query(User.id)
collection = [(1,), (2,), (7,), (8,)]
it = loading.merge_result(q, collection)
eq_(list(it), [(1,), (2,), (7,), (8,)])
def test_entity_col_mix_plain_tuple(self):
s, (u1, u2, u3, u4) = self._fixture()
User = self.classes.User
q = s.query(User, User.id)
collection = [(u1, 1), (u2, 2), (u3, 7), (u4, 8)]
it = loading.merge_result(q, collection)
it = list(it)
eq_([(x.id, y) for x, y in it], [(1, 1), (2, 2), (7, 7), (8, 8)])
eq_(list(it[0].keys()), ["User", "id"])
def test_entity_col_mix_keyed_tuple(self):
s, (u1, u2, u3, u4) = self._fixture()
User = self.classes.User
q = s.query(User, User.id)
def kt(*x):
return KeyedTuple(x, ["User", "id"])
collection = [kt(u1, 1), kt(u2, 2), kt(u3, 7), kt(u4, 8)]
it = loading.merge_result(q, collection)
it = list(it)
eq_([(x.id, y) for x, y in it], [(1, 1), (2, 2), (7, 7), (8, 8)])
eq_(list(it[0].keys()), ["User", "id"])
def test_none_entity(self):
s, (u1, u2, u3, u4) = self._fixture()
User = self.classes.User
ua = aliased(User)
q = s.query(User, ua)
def kt(*x):
return KeyedTuple(x, ["User", "useralias"])
collection = [kt(u1, u2), kt(u1, None), kt(u2, u3)]
it = loading.merge_result(q, collection)
eq_(
[(x and x.id or None, y and y.id or None) for x, y in it],
[(u1.id, u2.id), (u1.id, None), (u2.id, u3.id)],
)
|