1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
from unittest import SkipTest
import django
from django.db import OperationalError, ProgrammingError
from django.db.models import Window
from django.db.models.functions import Rank
from django.test import TestCase, skipUnlessDBFeature
from .models import Order, Region, User
@skipUnlessDBFeature("supports_select_union")
class NonCteQueries(TestCase):
"""Test non-CTE queries
These tests were adapted from the Django test suite. The models used
here use CTEManager and CTEQuerySet to verify feature parity with
their base classes Manager and QuerySet.
"""
@classmethod
def setUpTestData(cls):
Order.objects.all().delete()
def test_union_with_select_related_and_order(self):
e1 = User.objects.create(name="e1")
a1 = Order.objects.create(region_id="earth", user=e1)
a2 = Order.objects.create(region_id="moon", user=e1)
Order.objects.create(region_id="sun", user=e1)
base_qs = Order.objects.select_related("user").order_by()
qs1 = base_qs.filter(region_id="earth")
qs2 = base_qs.filter(region_id="moon")
print(qs1.union(qs2).order_by("pk").query)
self.assertSequenceEqual(qs1.union(qs2).order_by("pk"), [a1, a2])
@skipUnlessDBFeature("supports_slicing_ordering_in_compound")
def test_union_with_select_related_and_first(self):
e1 = User.objects.create(name="e1")
a1 = Order.objects.create(region_id="earth", user=e1)
Order.objects.create(region_id="moon", user=e1)
base_qs = Order.objects.select_related("user")
qs1 = base_qs.filter(region_id="earth")
qs2 = base_qs.filter(region_id="moon")
self.assertEqual(qs1.union(qs2).first(), a1)
def test_union_with_first(self):
e1 = User.objects.create(name="e1")
a1 = Order.objects.create(region_id="earth", user=e1)
base_qs = Order.objects.order_by()
qs1 = base_qs.filter(region_id="earth")
qs2 = base_qs.filter(region_id="moon")
self.assertEqual(qs1.union(qs2).first(), a1)
class WindowFunctions(TestCase):
def test_heterogeneous_filter_in_cte(self):
if django.VERSION < (4, 2):
raise SkipTest("feature added in Django 4.2")
from django_cte import With
cte = With(
Order.objects.annotate(
region_amount_rank=Window(
Rank(), partition_by="region_id", order_by="-amount"
),
)
.order_by("region_id")
.values("region_id", "region_amount_rank")
.filter(region_amount_rank=1, region_id__in=["sun", "moon"])
)
qs = cte.join(Region, name=cte.col.region_id).with_cte(cte)
print(qs.query)
# ProgrammingError: column cte.region_id does not exist
# WITH RECURSIVE "cte" AS (SELECT * FROM (
# SELECT "orders"."region_id" AS "col1", ...
# "region" INNER JOIN "cte" ON "region"."name" = ("cte"."region_id")
try:
self.assertEqual({r.name for r in qs}, {"moon", "sun"})
except (OperationalError, ProgrammingError) as err:
if "cte.region_id" in str(err):
raise SkipTest(
"window function auto-aliasing breaks CTE "
"column references"
)
raise
if django.VERSION < (5, 2):
assert 0, "unexpected pass"
|