File: test_expressions.py

package info (click to toggle)
mssql-django 1.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 644 kB
  • sloc: python: 5,289; sh: 105; makefile: 7
file content (134 lines) | stat: -rw-r--r-- 5,820 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright (c) Microsoft Corporation.
# Licensed under the BSD license.

import datetime
from unittest import skipUnless

from django import VERSION
from django.db.models import CharField, IntegerField, F
from django.db.models.expressions import Case, Exists, OuterRef, Subquery, Value, When, ExpressionWrapper
from django.test import TestCase, skipUnlessDBFeature

from django.db.models.aggregates import Count, Sum

from ..models import Author, Book, Comment, Post, Editor, ModelWithNullableFieldsOfDifferentTypes


DJANGO3 = VERSION[0] >= 3


class TestSubquery(TestCase):
    def setUp(self):
        self.author = Author.objects.create(name="author")
        self.post = Post.objects.create(title="foo", author=self.author)

    def test_with_count(self):
        newest = Comment.objects.filter(post=OuterRef('pk')).order_by('-created_at')
        Post.objects.annotate(
            post_exists=Subquery(newest.values('text')[:1])
        ).filter(post_exists=True).count()


class TestExists(TestCase):
    def setUp(self):
        self.author = Author.objects.create(name="author")
        self.post = Post.objects.create(title="foo", author=self.author)

    def test_with_count(self):
        Post.objects.annotate(
            post_exists=Exists(Post.objects.all())
        ).filter(post_exists=True).count()

    @skipUnless(DJANGO3, "Django 3 specific tests")
    def test_with_case_when(self):
        author = Author.objects.annotate(
            has_post=Case(
                When(Exists(Post.objects.filter(author=OuterRef('pk')).values('pk')), then=Value(1)),
                default=Value(0),
                output_field=IntegerField(),
            )
        ).get()
        self.assertEqual(author.has_post, 1)

    def test_unnecessary_exists_group_by(self):
        author = Author.objects.annotate(
            has_post=Case(
                When(Exists(Post.objects.filter(author=OuterRef('pk')).values('pk')), then=Value(1)),
                default=Value(0),
                output_field=IntegerField(),
            )).annotate(
            amount=Count("post")
        ).get()
        self.assertEqual(author.amount, 1)
        self.assertEqual(author.has_post, 1)

    def test_combined_expression_annotation_with_aggregation(self):
        book = Author.objects.annotate(
            combined=ExpressionWrapper(
                Value(2) * Value(5), output_field=IntegerField()
            ),
            null_value=ExpressionWrapper(
                Value(None), output_field=IntegerField()
            ),
            rating_count=Count("post"),
        ).first()
        self.assertEqual(book.combined, 10)
        self.assertEqual(book.null_value, None)


    @skipUnless(DJANGO3, "Django 3 specific tests")
    def test_order_by_exists(self):
        author_without_posts = Author.objects.create(name="other author")
        authors_by_posts = Author.objects.order_by(Exists(Post.objects.filter(author=OuterRef('pk'))).desc())
        self.assertSequenceEqual(authors_by_posts, [self.author, author_without_posts])

        authors_by_posts = Author.objects.order_by(Exists(Post.objects.filter(author=OuterRef('pk'))).asc())
        self.assertSequenceEqual(authors_by_posts, [author_without_posts, self.author])


class TestGroupBy(TestCase):
    def test_group_by_case(self):
        annotated_queryset = Book.objects.annotate(age=Case(
            When(id__gt=1000, then=Value("new")),
            default=Value("old"),
            output_field=CharField())).values('age').annotate(sum=Sum('id'))
        self.assertEqual(list(annotated_queryset.all()), [])

@skipUnless(DJANGO3, "Django 3 specific tests")
@skipUnlessDBFeature("order_by_nulls_first")
class TestOrderBy(TestCase):
    def setUp(self):
        self.author = Author.objects.create(name="author")
        self.post = Post.objects.create(title="foo", author=self.author)
        self.editor = Editor.objects.create(name="editor")
        self.post_alt = Post.objects.create(title="Post with editor", author=self.author, alt_editor=self.editor)

    def test_order_by_nulls_last(self):
        results = Post.objects.order_by(F("alt_editor").asc(nulls_last=True)).all()
        self.assertEqual(len(results), 2)
        self.assertIsNotNone(results[0].alt_editor)
        self.assertIsNone(results[1].alt_editor)

    def test_order_by_nulls_first(self):
        results = Post.objects.order_by(F("alt_editor").desc(nulls_first=True)).all()
        self.assertEqual(len(results), 2)
        self.assertIsNone(results[0].alt_editor)
        self.assertIsNotNone(results[1].alt_editor)

class TestBulkUpdate(TestCase):
     def test_bulk_update_different_column_types(self):
        data = (
            (1, 'a', datetime.datetime(year=2024, month=1, day=1)),
            (2, 'b', datetime.datetime(year=2023, month=12, day=31))
        )
        objs = ModelWithNullableFieldsOfDifferentTypes.objects.bulk_create(ModelWithNullableFieldsOfDifferentTypes(int_value=row_data[0],
                                                                                                                   name=row_data[1],
                                                                                                                   date=row_data[2]) for row_data in data)
        for obj in objs:
            obj.int_value = None
            obj.name = None
            obj.date = None
        ModelWithNullableFieldsOfDifferentTypes.objects.bulk_update(objs, ["int_value", "name", "date"])
        self.assertCountEqual(ModelWithNullableFieldsOfDifferentTypes.objects.filter(int_value__isnull=True), objs)
        self.assertCountEqual(ModelWithNullableFieldsOfDifferentTypes.objects.filter(name__isnull=True), objs)
        self.assertCountEqual(ModelWithNullableFieldsOfDifferentTypes.objects.filter(date__isnull=True), objs)