1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
|
import codecs
import copy
from decimal import Decimal
from django.apps.registry import Apps
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.utils import six
class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_delete_table = "DROP TABLE %(table)s"
sql_create_inline_fk = "REFERENCES %(to_table)s (%(to_column)s)"
def quote_value(self, value):
# The backend "mostly works" without this function and there are use
# cases for compiling Python without the sqlite3 libraries (e.g.
# security hardening).
import _sqlite3
try:
value = _sqlite3.adapt(value)
except _sqlite3.ProgrammingError:
pass
# Manual emulation of SQLite parameter quoting
if isinstance(value, type(True)):
return str(int(value))
elif isinstance(value, (Decimal, float)):
return str(value)
elif isinstance(value, six.integer_types):
return str(value)
elif isinstance(value, six.string_types):
return "'%s'" % six.text_type(value).replace("\'", "\'\'")
elif value is None:
return "NULL"
elif isinstance(value, (bytes, bytearray, six.memoryview)):
# Bytes are only allowed for BLOB fields, encoded as string
# literals containing hexadecimal data and preceded by a single "X"
# character:
# value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
value = bytes(value)
hex_encoder = codecs.getencoder('hex_codec')
value_hex, _length = hex_encoder(value)
# Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
return "X'%s'" % value_hex.decode('ascii')
else:
raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def _remake_table(self, model, create_fields=[], delete_fields=[], alter_fields=[], override_uniques=None,
override_indexes=None):
"""
Shortcut to transform a model from old_model into new_model
"""
# Work out the new fields dict / mapping
body = {f.name: f for f in model._meta.local_fields}
# Since mapping might mix column names and default values,
# its values must be already quoted.
mapping = {f.column: self.quote_name(f.column) for f in model._meta.local_fields}
# This maps field names (not columns) for things like unique_together
rename_mapping = {}
# If any of the new or altered fields is introducing a new PK,
# remove the old one
restore_pk_field = None
if any(f.primary_key for f in create_fields) or any(n.primary_key for o, n in alter_fields):
for name, field in list(body.items()):
if field.primary_key:
field.primary_key = False
restore_pk_field = field
if field.auto_created:
del body[name]
del mapping[field.column]
# Add in any created fields
for field in create_fields:
body[field.name] = field
# Choose a default and insert it into the copy map
if not field.many_to_many:
mapping[field.column] = self.quote_value(
self.effective_default(field)
)
# Add in any altered fields
for (old_field, new_field) in alter_fields:
body.pop(old_field.name, None)
mapping.pop(old_field.column, None)
body[new_field.name] = new_field
if old_field.null and not new_field.null:
case_sql = "coalesce(%(col)s, %(default)s)" % {
'col': self.quote_name(old_field.column),
'default': self.quote_value(self.effective_default(new_field))
}
mapping[new_field.column] = case_sql
else:
mapping[new_field.column] = self.quote_name(old_field.column)
rename_mapping[old_field.name] = new_field.name
# Remove any deleted fields
for field in delete_fields:
del body[field.name]
del mapping[field.column]
# Remove any implicit M2M tables
if field.many_to_many and field.rel.through._meta.auto_created:
return self.delete_model(field.rel.through)
# Work inside a new app registry
apps = Apps()
# Provide isolated instances of the fields to the new model body
# Instantiating the new model with an alternate db_table will alter
# the internal references of some of the provided fields.
body = copy.deepcopy(body)
# Work out the new value of unique_together, taking renames into
# account
if override_uniques is None:
override_uniques = [
[rename_mapping.get(n, n) for n in unique]
for unique in model._meta.unique_together
]
# Work out the new value for index_together, taking renames into
# account
if override_indexes is None:
override_indexes = [
[rename_mapping.get(n, n) for n in index]
for index in model._meta.index_together
]
# Construct a new model for the new state
meta_contents = {
'app_label': model._meta.app_label,
'db_table': model._meta.db_table + "__new",
'unique_together': override_uniques,
'index_together': override_indexes,
'apps': apps,
}
meta = type("Meta", tuple(), meta_contents)
body['Meta'] = meta
body['__module__'] = model.__module__
temp_model = type(model._meta.object_name, model.__bases__, body)
# Create a new table with that format. We remove things from the
# deferred SQL that match our table name, too
self.deferred_sql = [x for x in self.deferred_sql if model._meta.db_table not in x]
self.create_model(temp_model)
# Copy data from the old table
field_maps = list(mapping.items())
self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
self.quote_name(temp_model._meta.db_table),
', '.join(self.quote_name(x) for x, y in field_maps),
', '.join(y for x, y in field_maps),
self.quote_name(model._meta.db_table),
))
# Delete the old table
self.delete_model(model, handle_autom2m=False)
# Rename the new to the old
self.alter_db_table(temp_model, temp_model._meta.db_table, model._meta.db_table)
# Run deferred SQL on correct table
for sql in self.deferred_sql:
self.execute(sql.replace(temp_model._meta.db_table, model._meta.db_table))
self.deferred_sql = []
# Fix any PK-removed field
if restore_pk_field:
restore_pk_field.primary_key = True
def delete_model(self, model, handle_autom2m=True):
if handle_autom2m:
super(DatabaseSchemaEditor, self).delete_model(model)
else:
# Delete the table (and only that)
self.execute(self.sql_delete_table % {
"table": self.quote_name(model._meta.db_table),
})
def add_field(self, model, field):
"""
Creates a field on a model.
Usually involves adding a column, but may involve adding a
table instead (for M2M fields)
"""
# Special-case implicit M2M tables
if field.many_to_many and field.rel.through._meta.auto_created:
return self.create_model(field.rel.through)
self._remake_table(model, create_fields=[field])
def remove_field(self, model, field):
"""
Removes a field from a model. Usually involves deleting a column,
but for M2Ms may involve deleting a table.
"""
# M2M fields are a special case
if field.many_to_many:
# For implicit M2M tables, delete the auto-created table
if field.rel.through._meta.auto_created:
self.delete_model(field.rel.through)
# For explicit "through" M2M fields, do nothing
# For everything else, remake.
else:
# It might not actually have a column behind it
if field.db_parameters(connection=self.connection)['type'] is None:
return
self._remake_table(model, delete_fields=[field])
def _alter_field(self, model, old_field, new_field, old_type, new_type,
old_db_params, new_db_params, strict=False):
"""Actually perform a "physical" (non-ManyToMany) field update."""
# Alter by remaking table
self._remake_table(model, alter_fields=[(old_field, new_field)])
def alter_index_together(self, model, old_index_together, new_index_together):
"""
Deals with a model changing its index_together.
Note: The input index_togethers must be doubly-nested, not the single-
nested ["foo", "bar"] format.
"""
self._remake_table(model, override_indexes=new_index_together)
def alter_unique_together(self, model, old_unique_together, new_unique_together):
"""
Deals with a model changing its unique_together.
Note: The input unique_togethers must be doubly-nested, not the single-
nested ["foo", "bar"] format.
"""
self._remake_table(model, override_uniques=new_unique_together)
def _alter_many_to_many(self, model, old_field, new_field, strict):
"""
Alters M2Ms to repoint their to= endpoints.
"""
if old_field.rel.through._meta.db_table == new_field.rel.through._meta.db_table:
# The field name didn't change, but some options did; we have to propagate this altering.
self._remake_table(
old_field.rel.through,
alter_fields=[(
# We need the field that points to the target model, so we can tell alter_field to change it -
# this is m2m_reverse_field_name() (as opposed to m2m_field_name, which points to our model)
old_field.rel.through._meta.get_field(old_field.m2m_reverse_field_name()),
new_field.rel.through._meta.get_field(new_field.m2m_reverse_field_name()),
)],
override_uniques=(new_field.m2m_field_name(), new_field.m2m_reverse_field_name()),
)
return
# Make a new through table
self.create_model(new_field.rel.through)
# Copy the data across
self.execute("INSERT INTO %s (%s) SELECT %s FROM %s" % (
self.quote_name(new_field.rel.through._meta.db_table),
', '.join([
"id",
new_field.m2m_column_name(),
new_field.m2m_reverse_name(),
]),
', '.join([
"id",
old_field.m2m_column_name(),
old_field.m2m_reverse_name(),
]),
self.quote_name(old_field.rel.through._meta.db_table),
))
# Delete the old through table
self.delete_model(old_field.rel.through)
|