File: test_conditional_unique_index.py

package info (click to toggle)
python-django-postgres-extra 2.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,096 kB
  • sloc: python: 9,057; makefile: 17; sh: 7; sql: 1
file content (151 lines) | stat: -rw-r--r-- 4,739 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import pytest

from django.db import IntegrityError, models, transaction
from django.db.migrations import AddIndex, CreateModel

from psqlextra.indexes import ConditionalUniqueIndex

from .fake_model import get_fake_model
from .migrations import apply_migration, filtered_schema_editor


def test_cui_deconstruct():
    """Tests whether the :see:ConditionalUniqueIndex's deconstruct() method
    works properly."""

    original_kwargs = dict(
        condition="field IS NULL", name="great_index", fields=["field", "build"]
    )
    _, _, new_kwargs = ConditionalUniqueIndex(**original_kwargs).deconstruct()

    for key, value in original_kwargs.items():
        assert new_kwargs[key] == value


def test_cui_migrations():
    """Tests whether the migrations are properly generated and executed."""

    index_1 = ConditionalUniqueIndex(
        fields=["name", "other_name"],
        condition='"name" IS NOT NULL',
        name="index1",
    )

    index_2 = ConditionalUniqueIndex(
        fields=["other_name"], condition='"name" IS NULL', name="index2"
    )

    ops = [
        CreateModel(
            name="mymodel",
            fields=[
                ("id", models.IntegerField(primary_key=True)),
                ("name", models.CharField(max_length=255, null=True)),
                ("other_name", models.CharField(max_length=255)),
            ],
            options={
                # "indexes": [index_1, index_2],
            },
        ),
        AddIndex(model_name="mymodel", index=index_1),
        AddIndex(model_name="mymodel", index=index_2),
    ]

    with filtered_schema_editor("CREATE UNIQUE INDEX") as calls:
        apply_migration(ops)

    calls = [call[0] for _, call, _ in calls["CREATE UNIQUE INDEX"]]

    db_table = "tests_mymodel"
    query = 'CREATE UNIQUE INDEX "index1" ON "{0}" ("name", "other_name") WHERE "name" IS NOT NULL'
    assert str(calls[0]) == query.format(db_table)

    query = 'CREATE UNIQUE INDEX "index2" ON "{0}" ("other_name") WHERE "name" IS NULL'
    assert str(calls[1]) == query.format(db_table)


def test_cui_upserting():
    """Tests upserting respects the :see:ConditionalUniqueIndex rules."""
    model = get_fake_model(
        fields={
            "a": models.IntegerField(),
            "b": models.IntegerField(null=True),
            "c": models.IntegerField(),
        },
        meta_options={
            "indexes": [
                ConditionalUniqueIndex(
                    fields=["a", "b"], condition='"b" IS NOT NULL'
                ),
                ConditionalUniqueIndex(fields=["a"], condition='"b" IS NULL'),
            ]
        },
    )

    model.objects.upsert(
        conflict_target=["a"],
        index_predicate='"b" IS NULL',
        fields=dict(a=1, c=1),
    )
    assert model.objects.all().count() == 1
    assert model.objects.filter(a=1, c=1).count() == 1

    model.objects.upsert(
        conflict_target=["a"],
        index_predicate='"b" IS NULL',
        fields=dict(a=1, c=2),
    )
    assert model.objects.all().count() == 1
    assert model.objects.filter(a=1, c=1).count() == 0
    assert model.objects.filter(a=1, c=2).count() == 1

    model.objects.upsert(
        conflict_target=["a", "b"],
        index_predicate='"b" IS NOT NULL',
        fields=dict(a=1, b=1, c=1),
    )
    assert model.objects.all().count() == 2
    assert model.objects.filter(a=1, c=2).count() == 1
    assert model.objects.filter(a=1, b=1, c=1).count() == 1

    model.objects.upsert(
        conflict_target=["a", "b"],
        index_predicate='"b" IS NOT NULL',
        fields=dict(a=1, b=1, c=2),
    )
    assert model.objects.all().count() == 2
    assert model.objects.filter(a=1, c=1).count() == 0
    assert model.objects.filter(a=1, b=1, c=2).count() == 1


def test_cui_inserting():
    """Tests inserting respects the :see:ConditionalUniqueIndex rules."""

    model = get_fake_model(
        fields={
            "a": models.IntegerField(),
            "b": models.IntegerField(null=True),
            "c": models.IntegerField(),
        },
        meta_options={
            "indexes": [
                ConditionalUniqueIndex(
                    fields=["a", "b"], condition='"b" IS NOT NULL'
                ),
                ConditionalUniqueIndex(fields=["a"], condition='"b" IS NULL'),
            ]
        },
    )

    model.objects.create(a=1, c=1)
    with transaction.atomic():
        with pytest.raises(IntegrityError):
            model.objects.create(a=1, c=2)
    model.objects.create(a=2, c=1)

    model.objects.create(a=1, b=1, c=1)
    with transaction.atomic():
        with pytest.raises(IntegrityError):
            model.objects.create(a=1, b=1, c=2)

    model.objects.create(a=1, b=2, c=1)