File: schema.py

package info (click to toggle)
python-django 1.8.18-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 41,628 kB
  • sloc: python: 189,488; xml: 695; makefile: 194; sh: 169; sql: 11
file content (257 lines) | stat: -rw-r--r-- 11,377 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import codecs
import copy
from decimal import Decimal

from django.apps.registry import Apps
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.utils import six


class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):

    sql_delete_table = "DROP TABLE %(table)s"
    sql_create_inline_fk = "REFERENCES %(to_table)s (%(to_column)s)"

    def quote_value(self, value):
        # The backend "mostly works" without this function and there are use
        # cases for compiling Python without the sqlite3 libraries (e.g.
        # security hardening).
        import _sqlite3
        try:
            value = _sqlite3.adapt(value)
        except _sqlite3.ProgrammingError:
            pass
        # Manual emulation of SQLite parameter quoting
        if isinstance(value, type(True)):
            return str(int(value))
        elif isinstance(value, (Decimal, float)):
            return str(value)
        elif isinstance(value, six.integer_types):
            return str(value)
        elif isinstance(value, six.string_types):
            return "'%s'" % six.text_type(value).replace("\'", "\'\'")
        elif value is None:
            return "NULL"
        elif isinstance(value, (bytes, bytearray, six.memoryview)):
            # Bytes are only allowed for BLOB fields, encoded as string
            # literals containing hexadecimal data and preceded by a single "X"
            # character:
            # value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
            value = bytes(value)
            hex_encoder = codecs.getencoder('hex_codec')
            value_hex, _length = hex_encoder(value)
            # Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
            return "X'%s'" % value_hex.decode('ascii')
        else:
            raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))

    def _remake_table(self, model, create_fields=[], delete_fields=[], alter_fields=[], override_uniques=None,
                      override_indexes=None):
        """
        Shortcut to transform a model from old_model into new_model
        """
        # Work out the new fields dict / mapping
        body = {f.name: f for f in model._meta.local_fields}
        # Since mapping might mix column names and default values,
        # its values must be already quoted.
        mapping = {f.column: self.quote_name(f.column) for f in model._meta.local_fields}
        # This maps field names (not columns) for things like unique_together
        rename_mapping = {}
        # If any of the new or altered fields is introducing a new PK,
        # remove the old one
        restore_pk_field = None
        if any(f.primary_key for f in create_fields) or any(n.primary_key for o, n in alter_fields):
            for name, field in list(body.items()):
                if field.primary_key:
                    field.primary_key = False
                    restore_pk_field = field
                    if field.auto_created:
                        del body[name]
                        del mapping[field.column]
        # Add in any created fields
        for field in create_fields:
            body[field.name] = field
            # Choose a default and insert it into the copy map
            if not field.many_to_many:
                mapping[field.column] = self.quote_value(
                    self.effective_default(field)
                )
        # Add in any altered fields
        for (old_field, new_field) in alter_fields:
            body.pop(old_field.name, None)
            mapping.pop(old_field.column, None)
            body[new_field.name] = new_field
            if old_field.null and not new_field.null:
                case_sql = "coalesce(%(col)s, %(default)s)" % {
                    'col': self.quote_name(old_field.column),
                    'default': self.quote_value(self.effective_default(new_field))
                }
                mapping[new_field.column] = case_sql
            else:
                mapping[new_field.column] = self.quote_name(old_field.column)
            rename_mapping[old_field.name] = new_field.name
        # Remove any deleted fields
        for field in delete_fields:
            del body[field.name]
            del mapping[field.column]
            # Remove any implicit M2M tables
            if field.many_to_many and field.rel.through._meta.auto_created:
                return self.delete_model(field.rel.through)
        # Work inside a new app registry
        apps = Apps()

        # Provide isolated instances of the fields to the new model body
        # Instantiating the new model with an alternate db_table will alter
        # the internal references of some of the provided fields.
        body = copy.deepcopy(body)

        # Work out the new value of unique_together, taking renames into
        # account
        if override_uniques is None:
            override_uniques = [
                [rename_mapping.get(n, n) for n in unique]
                for unique in model._meta.unique_together
            ]

        # Work out the new value for index_together, taking renames into
        # account
        if override_indexes is None:
            override_indexes = [
                [rename_mapping.get(n, n) for n in index]
                for index in model._meta.index_together
            ]

        # Construct a new model for the new state
        meta_contents = {
            'app_label': model._meta.app_label,
            'db_table': model._meta.db_table + "__new",
            'unique_together': override_uniques,
            'index_together': override_indexes,
            'apps': apps,
        }
        meta = type("Meta", tuple(), meta_contents)
        body['Meta'] = meta
        body['__module__'] = model.__module__

        temp_model = type(model._meta.object_name, model.__bases__, body)
        # Create a new table with that format. We remove things from the
        # deferred SQL that match our table name, too
        self.deferred_sql = [x for x in self.deferred_sql if model._meta.db_table not in x]
        self.create_model(temp_model)
        # Copy data from the old table
        field_maps = list(mapping.items())
        self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
            self.quote_name(temp_model._meta.db_table),
            ', '.join(self.quote_name(x) for x, y in field_maps),
            ', '.join(y for x, y in field_maps),
            self.quote_name(model._meta.db_table),
        ))
        # Delete the old table
        self.delete_model(model, handle_autom2m=False)
        # Rename the new to the old
        self.alter_db_table(temp_model, temp_model._meta.db_table, model._meta.db_table)
        # Run deferred SQL on correct table
        for sql in self.deferred_sql:
            self.execute(sql.replace(temp_model._meta.db_table, model._meta.db_table))
        self.deferred_sql = []
        # Fix any PK-removed field
        if restore_pk_field:
            restore_pk_field.primary_key = True

    def delete_model(self, model, handle_autom2m=True):
        if handle_autom2m:
            super(DatabaseSchemaEditor, self).delete_model(model)
        else:
            # Delete the table (and only that)
            self.execute(self.sql_delete_table % {
                "table": self.quote_name(model._meta.db_table),
            })

    def add_field(self, model, field):
        """
        Creates a field on a model.
        Usually involves adding a column, but may involve adding a
        table instead (for M2M fields)
        """
        # Special-case implicit M2M tables
        if field.many_to_many and field.rel.through._meta.auto_created:
            return self.create_model(field.rel.through)
        self._remake_table(model, create_fields=[field])

    def remove_field(self, model, field):
        """
        Removes a field from a model. Usually involves deleting a column,
        but for M2Ms may involve deleting a table.
        """
        # M2M fields are a special case
        if field.many_to_many:
            # For implicit M2M tables, delete the auto-created table
            if field.rel.through._meta.auto_created:
                self.delete_model(field.rel.through)
            # For explicit "through" M2M fields, do nothing
        # For everything else, remake.
        else:
            # It might not actually have a column behind it
            if field.db_parameters(connection=self.connection)['type'] is None:
                return
            self._remake_table(model, delete_fields=[field])

    def _alter_field(self, model, old_field, new_field, old_type, new_type,
                     old_db_params, new_db_params, strict=False):
        """Actually perform a "physical" (non-ManyToMany) field update."""
        # Alter by remaking table
        self._remake_table(model, alter_fields=[(old_field, new_field)])

    def alter_index_together(self, model, old_index_together, new_index_together):
        """
        Deals with a model changing its index_together.
        Note: The input index_togethers must be doubly-nested, not the single-
        nested ["foo", "bar"] format.
        """
        self._remake_table(model, override_indexes=new_index_together)

    def alter_unique_together(self, model, old_unique_together, new_unique_together):
        """
        Deals with a model changing its unique_together.
        Note: The input unique_togethers must be doubly-nested, not the single-
        nested ["foo", "bar"] format.
        """
        self._remake_table(model, override_uniques=new_unique_together)

    def _alter_many_to_many(self, model, old_field, new_field, strict):
        """
        Alters M2Ms to repoint their to= endpoints.
        """
        if old_field.rel.through._meta.db_table == new_field.rel.through._meta.db_table:
            # The field name didn't change, but some options did; we have to propagate this altering.
            self._remake_table(
                old_field.rel.through,
                alter_fields=[(
                    # We need the field that points to the target model, so we can tell alter_field to change it -
                    # this is m2m_reverse_field_name() (as opposed to m2m_field_name, which points to our model)
                    old_field.rel.through._meta.get_field(old_field.m2m_reverse_field_name()),
                    new_field.rel.through._meta.get_field(new_field.m2m_reverse_field_name()),
                )],
                override_uniques=(new_field.m2m_field_name(), new_field.m2m_reverse_field_name()),
            )
            return

        # Make a new through table
        self.create_model(new_field.rel.through)
        # Copy the data across
        self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
            self.quote_name(new_field.rel.through._meta.db_table),
            ', '.join([
                "id",
                new_field.m2m_column_name(),
                new_field.m2m_reverse_name(),
            ]),
            ', '.join([
                "id",
                old_field.m2m_column_name(),
                old_field.m2m_reverse_name(),
            ]),
            self.quote_name(old_field.rel.through._meta.db_table),
        ))
        # Delete the old through table
        self.delete_model(old_field.rel.through)