1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from django.test import TestCase
from polymorphic.tests.models import (
Base,
BlogA,
BlogEntry,
Model2A,
Model2B,
Model2C,
Model2D,
ModelX,
ModelY,
One2OneRelatingModel,
RelatingModel,
)
class MultipleDatabasesTests(TestCase):
databases = ["default", "secondary"]
def test_save_to_non_default_database(self):
Model2A.objects.db_manager("secondary").create(field1="A1")
Model2C(field1="C1", field2="C2", field3="C3").save(using="secondary")
Model2B.objects.create(field1="B1", field2="B2")
Model2D(field1="D1", field2="D2", field3="D3", field4="D4").save()
self.assertQuerySetEqual(
Model2A.objects.order_by("id"),
[Model2B, Model2D],
transform=lambda o: o.__class__,
)
self.assertQuerySetEqual(
Model2A.objects.db_manager("secondary").order_by("id"),
[Model2A, Model2C],
transform=lambda o: o.__class__,
)
def test_instance_of_filter_on_non_default_database(self):
Base.objects.db_manager("secondary").create(field_b="B1")
ModelX.objects.db_manager("secondary").create(field_b="B", field_x="X")
ModelY.objects.db_manager("secondary").create(field_b="Y", field_y="Y")
objects = Base.objects.db_manager("secondary").filter(instance_of=Base)
self.assertQuerySetEqual(
objects,
[Base, ModelX, ModelY],
transform=lambda o: o.__class__,
ordered=False,
)
self.assertQuerySetEqual(
Base.objects.db_manager("secondary").filter(instance_of=ModelX),
[ModelX],
transform=lambda o: o.__class__,
)
self.assertQuerySetEqual(
Base.objects.db_manager("secondary").filter(instance_of=ModelY),
[ModelY],
transform=lambda o: o.__class__,
)
self.assertQuerySetEqual(
Base.objects.db_manager("secondary").filter(
Q(instance_of=ModelX) | Q(instance_of=ModelY)
),
[ModelX, ModelY],
transform=lambda o: o.__class__,
ordered=False,
)
def test_forward_many_to_one_descriptor_on_non_default_database(self):
def func():
blog = BlogA.objects.db_manager("secondary").create(name="Blog", info="Info")
entry = BlogEntry.objects.db_manager("secondary").create(blog=blog, text="Text")
ContentType.objects.clear_cache()
entry = BlogEntry.objects.db_manager("secondary").get(pk=entry.id)
assert blog == entry.blog
# Ensure no queries are made using the default database.
self.assertNumQueries(0, func)
def test_reverse_many_to_one_descriptor_on_non_default_database(self):
def func():
blog = BlogA.objects.db_manager("secondary").create(name="Blog", info="Info")
entry = BlogEntry.objects.db_manager("secondary").create(blog=blog, text="Text")
ContentType.objects.clear_cache()
blog = BlogA.objects.db_manager("secondary").get(pk=blog.id)
assert entry == blog.blogentry_set.using("secondary").get()
# Ensure no queries are made using the default database.
self.assertNumQueries(0, func)
def test_reverse_one_to_one_descriptor_on_non_default_database(self):
def func():
m2a = Model2A.objects.db_manager("secondary").create(field1="A1")
one2one = One2OneRelatingModel.objects.db_manager("secondary").create(
one2one=m2a, field1="121"
)
ContentType.objects.clear_cache()
m2a = Model2A.objects.db_manager("secondary").get(pk=m2a.id)
assert one2one == m2a.one2onerelatingmodel
# Ensure no queries are made using the default database.
self.assertNumQueries(0, func)
def test_many_to_many_descriptor_on_non_default_database(self):
def func():
m2a = Model2A.objects.db_manager("secondary").create(field1="A1")
rm = RelatingModel.objects.db_manager("secondary").create()
rm.many2many.add(m2a)
ContentType.objects.clear_cache()
m2a = Model2A.objects.db_manager("secondary").get(pk=m2a.id)
assert rm == m2a.relatingmodel_set.using("secondary").get()
# Ensure no queries are made using the default database.
self.assertNumQueries(0, func)
|