File: schema.py

package info (click to toggle)
python-django 3%3A5.2.5-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 61,236 kB
  • sloc: python: 361,585; javascript: 19,250; xml: 211; makefile: 182; sh: 28
file content (124 lines) | stat: -rw-r--r-- 4,468 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from django.contrib.gis.db.models import GeometryField
from django.db.backends.postgresql.schema import DatabaseSchemaEditor
from django.db.models.expressions import Col, Func


class PostGISSchemaEditor(DatabaseSchemaEditor):
    geom_index_type = "GIST"
    geom_index_ops_nd = "GIST_GEOMETRY_OPS_ND"
    rast_index_template = "ST_ConvexHull(%(expressions)s)"

    sql_alter_column_to_3d = (
        "ALTER COLUMN %(column)s TYPE %(type)s USING ST_Force3D(%(column)s)::%(type)s"
    )
    sql_alter_column_to_2d = (
        "ALTER COLUMN %(column)s TYPE %(type)s USING ST_Force2D(%(column)s)::%(type)s"
    )

    def geo_quote_name(self, name):
        return self.connection.ops.geo_quote_name(name)

    def _field_should_be_indexed(self, model, field):
        if getattr(field, "spatial_index", False):
            return True
        return super()._field_should_be_indexed(model, field)

    def _create_index_sql(self, model, *, fields=None, **kwargs):
        if fields is None or len(fields) != 1 or not hasattr(fields[0], "geodetic"):
            return super()._create_index_sql(model, fields=fields, **kwargs)

        return self._create_spatial_index_sql(model, fields[0], **kwargs)

    def _alter_column_type_sql(
        self, table, old_field, new_field, new_type, old_collation, new_collation
    ):
        """
        Special case when dimension changed.
        """
        if not hasattr(old_field, "dim") or not hasattr(new_field, "dim"):
            return super()._alter_column_type_sql(
                table, old_field, new_field, new_type, old_collation, new_collation
            )

        if old_field.dim == 2 and new_field.dim == 3:
            sql_alter = self.sql_alter_column_to_3d
        elif old_field.dim == 3 and new_field.dim == 2:
            sql_alter = self.sql_alter_column_to_2d
        else:
            sql_alter = self.sql_alter_column_type
        return (
            (
                sql_alter
                % {
                    "column": self.quote_name(new_field.column),
                    "type": new_type,
                    "collation": "",
                },
                [],
            ),
            [],
        )

    def _alter_field(
        self,
        model,
        old_field,
        new_field,
        old_type,
        new_type,
        old_db_params,
        new_db_params,
        strict=False,
    ):
        super()._alter_field(
            model,
            old_field,
            new_field,
            old_type,
            new_type,
            old_db_params,
            new_db_params,
            strict=strict,
        )

        old_field_spatial_index = (
            isinstance(old_field, GeometryField) and old_field.spatial_index
        )
        new_field_spatial_index = (
            isinstance(new_field, GeometryField) and new_field.spatial_index
        )
        if not old_field_spatial_index and new_field_spatial_index:
            self.execute(self._create_spatial_index_sql(model, new_field))
        elif old_field_spatial_index and not new_field_spatial_index:
            self.execute(self._delete_spatial_index_sql(model, old_field))

    def _create_spatial_index_name(self, model, field):
        return self._create_index_name(model._meta.db_table, [field.column], "_id")

    def _create_spatial_index_sql(self, model, field, **kwargs):
        expressions = None
        opclasses = None
        fields = [field]
        if field.geom_type == "RASTER":
            # For raster fields, wrap index creation SQL statement with ST_ConvexHull.
            # Indexes on raster columns are based on the convex hull of the raster.
            expressions = Func(Col(None, field), template=self.rast_index_template)
            fields = None
        elif field.dim > 2 and not field.geography:
            # Use "nd" ops which are fast on multidimensional cases
            opclasses = [self.geom_index_ops_nd]
        if not (name := kwargs.get("name")):
            name = self._create_spatial_index_name(model, field)

        return super()._create_index_sql(
            model,
            fields=fields,
            name=name,
            using=" USING %s" % self.geom_index_type,
            opclasses=opclasses,
            expressions=expressions,
        )

    def _delete_spatial_index_sql(self, model, field):
        index_name = self._create_spatial_index_name(model, field)
        return self._delete_index_sql(model, index_name)