1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
from django.db import connection, models
from psqlextra.backend.schema import PostgresSchemaEditor
from . import db_introspection
from .fake_model import (
define_fake_materialized_view_model,
define_fake_view_model,
get_fake_model,
)
def test_schema_editor_create_delete_view():
"""Tests whether creating and then deleting a view using the schema editor
works as expected."""
underlying_model = get_fake_model({"name": models.TextField()})
model = define_fake_view_model(
{"name": models.TextField()},
{"query": underlying_model.objects.filter(name="test1")},
)
underlying_model.objects.create(name="test1")
underlying_model.objects.create(name="test2")
schema_editor = PostgresSchemaEditor(connection)
schema_editor.create_view_model(model)
# view should only show records name="test"1
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test1"
# create another record, view should have it right away
underlying_model.objects.create(name="test1")
assert model.objects.count() == 2
# delete the view
schema_editor.delete_view_model(model)
# make sure it was actually deleted
assert model._meta.db_table not in db_introspection.table_names(True)
def test_schema_editor_replace_view():
"""Tests whether creating a view and then replacing it with another one
(thus changing the backing query) works as expected."""
underlying_model = get_fake_model({"name": models.TextField()})
model = define_fake_view_model(
{"name": models.TextField()},
{"query": underlying_model.objects.filter(name="test1")},
)
underlying_model.objects.create(name="test1")
underlying_model.objects.create(name="test2")
schema_editor = PostgresSchemaEditor(connection)
schema_editor.create_view_model(model)
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test1"
model._view_meta.query = underlying_model.objects.filter(
name="test2"
).query.sql_with_params()
schema_editor.replace_view_model(model)
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test2"
def test_schema_editor_create_delete_materialized_view():
"""Tests whether creating and then deleting a materialized view using the
schema editor works as expected."""
underlying_model = get_fake_model({"name": models.TextField()})
model = define_fake_materialized_view_model(
{"name": models.TextField()},
{"query": underlying_model.objects.filter(name="test1")},
)
underlying_model.objects.create(name="test1")
underlying_model.objects.create(name="test2")
schema_editor = PostgresSchemaEditor(connection)
schema_editor.create_materialized_view_model(model)
# materialized view should only show records name="test"1
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test1"
# delete the materialized view
schema_editor.delete_materialized_view_model(model)
# make sure it was actually deleted
assert model._meta.db_table not in db_introspection.table_names(True)
def test_schema_editor_replace_materialized_view():
"""Tests whether creating a materialized view and then replacing it with
another one (thus changing the backing query) works as expected."""
underlying_model = get_fake_model({"name": models.TextField()})
model = define_fake_materialized_view_model(
{"name": models.TextField()},
{"query": underlying_model.objects.filter(name="test1")},
{"indexes": [models.Index(fields=["name"])]},
)
underlying_model.objects.create(name="test1")
underlying_model.objects.create(name="test2")
schema_editor = PostgresSchemaEditor(connection)
schema_editor.create_materialized_view_model(model)
for index in model._meta.indexes:
schema_editor.add_index(model, index)
constraints_before = db_introspection.get_constraints(model._meta.db_table)
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test1"
model._view_meta.query = underlying_model.objects.filter(
name="test2"
).query.sql_with_params()
schema_editor.replace_materialized_view_model(model)
objs = list(model.objects.all())
assert len(objs) == 1
assert objs[0].name == "test2"
# make sure all indexes/constraints still exists because
# replacing a materialized view involves re-creating it
constraints_after = db_introspection.get_constraints(model._meta.db_table)
assert constraints_after == constraints_before
|