File: test_schema_editor_view.py

package info (click to toggle)
python-django-postgres-extra 2.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,096 kB
  • sloc: python: 9,057; makefile: 17; sh: 7; sql: 1
file content (145 lines) | stat: -rw-r--r-- 4,710 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from django.db import connection, models

from psqlextra.backend.schema import PostgresSchemaEditor

from . import db_introspection
from .fake_model import (
    define_fake_materialized_view_model,
    define_fake_view_model,
    get_fake_model,
)


def test_schema_editor_create_delete_view():
    """Tests whether creating and then deleting a view using the schema editor
    works as expected."""

    underlying_model = get_fake_model({"name": models.TextField()})

    model = define_fake_view_model(
        {"name": models.TextField()},
        {"query": underlying_model.objects.filter(name="test1")},
    )

    underlying_model.objects.create(name="test1")
    underlying_model.objects.create(name="test2")

    schema_editor = PostgresSchemaEditor(connection)
    schema_editor.create_view_model(model)

    # view should only show records name="test"1
    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test1"

    # create another record, view should have it right away
    underlying_model.objects.create(name="test1")
    assert model.objects.count() == 2

    # delete the view
    schema_editor.delete_view_model(model)

    # make sure it was actually deleted
    assert model._meta.db_table not in db_introspection.table_names(True)


def test_schema_editor_replace_view():
    """Tests whether creating a view and then replacing it with another one
    (thus changing the backing query) works as expected."""

    underlying_model = get_fake_model({"name": models.TextField()})

    model = define_fake_view_model(
        {"name": models.TextField()},
        {"query": underlying_model.objects.filter(name="test1")},
    )

    underlying_model.objects.create(name="test1")
    underlying_model.objects.create(name="test2")

    schema_editor = PostgresSchemaEditor(connection)
    schema_editor.create_view_model(model)

    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test1"

    model._view_meta.query = underlying_model.objects.filter(
        name="test2"
    ).query.sql_with_params()
    schema_editor.replace_view_model(model)

    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test2"


def test_schema_editor_create_delete_materialized_view():
    """Tests whether creating and then deleting a materialized view using the
    schema editor works as expected."""

    underlying_model = get_fake_model({"name": models.TextField()})

    model = define_fake_materialized_view_model(
        {"name": models.TextField()},
        {"query": underlying_model.objects.filter(name="test1")},
    )

    underlying_model.objects.create(name="test1")
    underlying_model.objects.create(name="test2")

    schema_editor = PostgresSchemaEditor(connection)
    schema_editor.create_materialized_view_model(model)

    # materialized view should only show records name="test"1
    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test1"

    # delete the materialized view
    schema_editor.delete_materialized_view_model(model)

    # make sure it was actually deleted
    assert model._meta.db_table not in db_introspection.table_names(True)


def test_schema_editor_replace_materialized_view():
    """Tests whether creating a materialized view and then replacing it with
    another one (thus changing the backing query) works as expected."""

    underlying_model = get_fake_model({"name": models.TextField()})

    model = define_fake_materialized_view_model(
        {"name": models.TextField()},
        {"query": underlying_model.objects.filter(name="test1")},
        {"indexes": [models.Index(fields=["name"])]},
    )

    underlying_model.objects.create(name="test1")
    underlying_model.objects.create(name="test2")

    schema_editor = PostgresSchemaEditor(connection)
    schema_editor.create_materialized_view_model(model)

    for index in model._meta.indexes:
        schema_editor.add_index(model, index)

    constraints_before = db_introspection.get_constraints(model._meta.db_table)

    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test1"

    model._view_meta.query = underlying_model.objects.filter(
        name="test2"
    ).query.sql_with_params()
    schema_editor.replace_materialized_view_model(model)

    objs = list(model.objects.all())
    assert len(objs) == 1
    assert objs[0].name == "test2"

    # make sure all indexes/constraints still exists because
    # replacing a materialized view involves re-creating it
    constraints_after = db_introspection.get_constraints(model._meta.db_table)
    assert constraints_after == constraints_before