1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
import sqlalchemy as sa
from sqlalchemy_utils import merge_references
from tests import TestCase
class TestMergeReferences(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
def __repr__(self):
return 'User(%r)' % self.name
class BlogPost(self.Base):
__tablename__ = 'blog_post'
id = sa.Column(sa.Integer, primary_key=True)
title = sa.Column(sa.Unicode(255))
content = sa.Column(sa.UnicodeText)
author_id = sa.Column(sa.Integer, sa.ForeignKey('user.id'))
author = sa.orm.relationship(User)
self.User = User
self.BlogPost = BlogPost
def test_updates_foreign_keys(self):
john = self.User(name=u'John')
jack = self.User(name=u'Jack')
post = self.BlogPost(title=u'Some title', author=john)
post2 = self.BlogPost(title=u'Other title', author=jack)
self.session.add(john)
self.session.add(jack)
self.session.add(post)
self.session.add(post2)
self.session.commit()
merge_references(john, jack)
self.session.commit()
assert post.author == jack
assert post2.author == jack
def test_object_merging_whenever_possible(self):
john = self.User(name=u'John')
jack = self.User(name=u'Jack')
post = self.BlogPost(title=u'Some title', author=john)
post2 = self.BlogPost(title=u'Other title', author=jack)
self.session.add(john)
self.session.add(jack)
self.session.add(post)
self.session.add(post2)
self.session.commit()
# Load the author for post
assert post.author_id == john.id
merge_references(john, jack)
assert post.author_id == jack.id
assert post2.author_id == jack.id
class TestMergeReferencesWithManyToManyAssociations(TestCase):
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
def __repr__(self):
return 'User(%r)' % self.name
team_member = sa.Table(
'team_member', self.Base.metadata,
sa.Column(
'user_id', sa.Integer,
sa.ForeignKey('user.id', ondelete='CASCADE'),
primary_key=True
),
sa.Column(
'team_id', sa.Integer,
sa.ForeignKey('team.id', ondelete='CASCADE'),
primary_key=True
)
)
class Team(self.Base):
__tablename__ = 'team'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Unicode(255))
members = sa.orm.relationship(
User,
secondary=team_member,
backref='teams'
)
self.User = User
self.Team = Team
def test_supports_associations(self):
john = self.User(name=u'John')
jack = self.User(name=u'Jack')
team = self.Team(name=u'Team')
team.members.append(john)
self.session.add(john)
self.session.add(jack)
self.session.commit()
merge_references(john, jack)
assert john not in team.members
assert jack in team.members
class TestMergeReferencesWithManyToManyAssociationObjects(TestCase):
def create_models(self):
class Team(self.Base):
__tablename__ = 'team'
id = sa.Column(sa.Integer, autoincrement=True, primary_key=True)
name = sa.Column(sa.Unicode(255))
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, autoincrement=True, primary_key=True)
name = sa.Column(sa.Unicode(255))
class TeamMember(self.Base):
__tablename__ = 'team_member'
user_id = sa.Column(
sa.Integer,
sa.ForeignKey(User.id, ondelete='CASCADE'),
primary_key=True
)
team_id = sa.Column(
sa.Integer,
sa.ForeignKey(Team.id, ondelete='CASCADE'),
primary_key=True
)
role = sa.Column(sa.Unicode(255))
team = sa.orm.relationship(
Team,
backref=sa.orm.backref(
'members',
cascade='all, delete-orphan'
),
primaryjoin=team_id == Team.id,
)
user = sa.orm.relationship(
User,
backref=sa.orm.backref(
'memberships',
cascade='all, delete-orphan'
),
primaryjoin=user_id == User.id,
)
self.User = User
self.TeamMember = TeamMember
self.Team = Team
def test_supports_associations(self):
john = self.User(name=u'John')
jack = self.User(name=u'Jack')
team = self.Team(name=u'Team')
team.members.append(self.TeamMember(user=john))
self.session.add(john)
self.session.add(jack)
self.session.add(team)
self.session.commit()
merge_references(john, jack)
self.session.commit()
users = [member.user for member in team.members]
assert john not in users
assert jack in users
|