1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
from django.db import transaction
from django.db.models import Case, ForeignKey, ManyToManyField, Q, When
from django.forms.models import model_to_dict
from simple_history.exceptions import AlternativeManagerError, NotHistoricalModelError
def update_change_reason(instance, reason):
attrs = {}
model = type(instance)
manager = instance if instance.pk is not None else model
history = get_history_manager_for_model(manager)
history_fields = [field.attname for field in history.model._meta.fields]
for field in instance._meta.fields:
if field.attname not in history_fields:
continue
value = getattr(instance, field.attname)
if field.primary_key is True:
if value is not None:
attrs[field.attname] = value
else:
attrs[field.attname] = value
record = history.filter(**attrs).order_by("-history_date").first()
record.history_change_reason = reason
record.save()
def get_history_manager_for_model(model):
"""Return the history manager for a given app model."""
try:
manager_name = model._meta.simple_history_manager_attribute
except AttributeError:
raise NotHistoricalModelError(f"Cannot find a historical model for {model}.")
return getattr(model, manager_name)
def get_history_manager_from_history(history_instance):
"""
Return the history manager, based on an existing history instance.
"""
key_name = get_app_model_primary_key_name(history_instance.instance_type)
return get_history_manager_for_model(history_instance.instance_type).filter(
**{key_name: getattr(history_instance, key_name)}
)
def get_history_model_for_model(model):
"""Return the history model for a given app model."""
return get_history_manager_for_model(model).model
def get_app_model_primary_key_name(model):
"""Return the primary key name for a given app model."""
if isinstance(model._meta.pk, ForeignKey):
return model._meta.pk.name + "_id"
return model._meta.pk.name
def get_m2m_field_name(m2m_field: ManyToManyField) -> str:
"""
Returns the field name of an M2M field's through model that corresponds to the model
the M2M field is defined on.
E.g. for a ``votes`` M2M field on a ``Poll`` model that references a ``Vote`` model
(and with a default-generated through model), this function would return ``"poll"``.
"""
# This method is part of Django's internal API
return m2m_field.m2m_field_name()
def get_m2m_reverse_field_name(m2m_field: ManyToManyField) -> str:
"""
Returns the field name of an M2M field's through model that corresponds to the model
the M2M field references.
E.g. for a ``votes`` M2M field on a ``Poll`` model that references a ``Vote`` model
(and with a default-generated through model), this function would return ``"vote"``.
"""
# This method is part of Django's internal API
return m2m_field.m2m_reverse_field_name()
def bulk_create_with_history(
objs,
model,
batch_size=None,
ignore_conflicts=False,
default_user=None,
default_change_reason=None,
default_date=None,
custom_historical_attrs=None,
):
"""
Bulk create the objects specified by objs while also bulk creating
their history (all in one transaction).
Because of not providing primary key attribute after bulk_create on any DB except
Postgres (https://docs.djangoproject.com/en/2.2/ref/models/querysets/#bulk-create)
Divide this process on two transactions for other DB's
:param objs: List of objs (not yet saved to the db) of type model
:param model: Model class that should be created
:param batch_size: Number of objects that should be created in each batch
:param default_user: Optional user to specify as the history_user in each historical
record
:param default_change_reason: Optional change reason to specify as the change_reason
in each historical record
:param default_date: Optional date to specify as the history_date in each historical
record
:param custom_historical_attrs: Optional dict of field `name`:`value` to specify
values for custom fields
:return: List of objs with IDs
"""
# Exclude ManyToManyFields because they end up as invalid kwargs to
# model.objects.filter(...) below.
exclude_fields = [
field.name
for field in model._meta.get_fields()
if isinstance(field, ManyToManyField)
]
history_manager = get_history_manager_for_model(model)
model_manager = model._default_manager
second_transaction_required = True
with transaction.atomic(savepoint=False):
objs_with_id = model_manager.bulk_create(
objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts
)
if objs_with_id and objs_with_id[0].pk and not ignore_conflicts:
second_transaction_required = False
history_manager.bulk_history_create(
objs_with_id,
batch_size=batch_size,
default_user=default_user,
default_change_reason=default_change_reason,
default_date=default_date,
custom_historical_attrs=custom_historical_attrs,
)
if second_transaction_required:
with transaction.atomic(savepoint=False):
# Generate a common query to avoid n+1 selections
# https://github.com/jazzband/django-simple-history/issues/974
cumulative_filter = None
obj_when_list = []
for i, obj in enumerate(objs_with_id):
attributes = dict(
filter(
lambda x: x[1] is not None,
model_to_dict(obj, exclude=exclude_fields).items(),
)
)
q = Q(**attributes)
cumulative_filter = (cumulative_filter | q) if cumulative_filter else q
# https://stackoverflow.com/a/49625179/1960509
# DEV: If an attribute has `then` as a key
# then they'll also run into issues with `bulk_update`
# due to shared implementation
# https://github.com/django/django/blob/4.0.4/django/db/models/query.py#L624-L638
obj_when_list.append(When(**attributes, then=i))
obj_list = (
list(
model_manager.filter(cumulative_filter).order_by(
Case(*obj_when_list)
)
)
if objs_with_id
else []
)
history_manager.bulk_history_create(
obj_list,
batch_size=batch_size,
default_user=default_user,
default_change_reason=default_change_reason,
default_date=default_date,
custom_historical_attrs=custom_historical_attrs,
)
objs_with_id = obj_list
return objs_with_id
def bulk_update_with_history(
objs,
model,
fields,
batch_size=None,
default_user=None,
default_change_reason=None,
default_date=None,
manager=None,
custom_historical_attrs=None,
):
"""
Bulk update the objects specified by objs while also bulk creating
their history (all in one transaction).
:param objs: List of objs of type model to be updated
:param model: Model class that should be updated
:param fields: The fields that are updated. If empty, no model objects will be
changed, but history records will still be created.
:param batch_size: Number of objects that should be updated in each batch
:param default_user: Optional user to specify as the history_user in each historical
record
:param default_change_reason: Optional change reason to specify as the change_reason
in each historical record
:param default_date: Optional date to specify as the history_date in each historical
record
:param manager: Optional model manager to use for the model instead of the default
manager
:param custom_historical_attrs: Optional dict of field `name`:`value` to specify
values for custom fields
:return: The number of model rows updated, not including any history objects
"""
history_manager = get_history_manager_for_model(model)
model_manager = manager or model._default_manager
if model_manager.model is not model:
raise AlternativeManagerError("The given manager does not belong to the model.")
with transaction.atomic(savepoint=False):
if not fields:
# Allow not passing any fields if the user wants to bulk-create history
# records - e.g. with `custom_historical_attrs` provided
# (Calling `bulk_update()` with no fields would have raised an error)
rows_updated = 0
else:
rows_updated = model_manager.bulk_update(
objs, fields, batch_size=batch_size
)
history_manager.bulk_history_create(
objs,
batch_size=batch_size,
update=True,
default_user=default_user,
default_change_reason=default_change_reason,
default_date=default_date,
custom_historical_attrs=custom_historical_attrs,
)
return rows_updated
def get_change_reason_from_object(obj):
if hasattr(obj, "_change_reason"):
return getattr(obj, "_change_reason")
return None
|