1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
import collections
from typing import TYPE_CHECKING, Callable, List, Tuple
from pgtrigger import features
_unset = object()
if TYPE_CHECKING:
from django.db.models import Model
from pgtrigger.core import Trigger
# All registered triggers for each model
class _Registry(collections.UserDict):
@property
def pg_function_names(self):
"""
The postgres function names of all registered triggers
"""
return {trigger.get_pgid(model) for model, trigger in self.values()}
@property
def by_db_table(self):
"""
Return the registry keys by db_table, name
"""
return {(model._meta.db_table, trigger.name): trigger for model, trigger in self.values()}
def __getitem__(self, key):
assert isinstance(key, str)
if len(key.split(":")) == 1:
raise ValueError(
'Trigger URI must be in the format of "app_label.model_name:trigger_name"'
)
elif key not in _registry:
raise KeyError(f'URI "{key}" not found in pgtrigger registry')
return super().__getitem__(key)
def __setitem__(self, key, value):
assert isinstance(key, str)
model, trigger = value
assert f"{model._meta.label}:{trigger.name}" == key
found_trigger = self.by_db_table.get((model._meta.db_table, trigger.name))
if not found_trigger or found_trigger != trigger:
if found_trigger:
raise KeyError(
f'Trigger name "{trigger.name}" already'
f' used for model "{model._meta.label}"'
f' table "{model._meta.db_table}".'
)
if trigger.get_pgid(model) in self.pg_function_names:
raise KeyError(
f'Trigger "{trigger.name}" on model "{model._meta.label}"'
" has Postgres function name that's already in use."
" Use a different name for the trigger."
)
# Add the trigger to Meta.triggers.
# Note, pgtrigger's App.ready() method auto-registers any
# triggers in Meta already, meaning the trigger may already exist. If so, ignore it
if features.migrations(): # pragma: no branch
if trigger not in getattr(model._meta, "triggers", []):
model._meta.triggers = list(getattr(model._meta, "triggers", [])) + [trigger]
if trigger not in model._meta.original_attrs.get("triggers", []):
model._meta.original_attrs["triggers"] = list(
model._meta.original_attrs.get("triggers", [])
) + [trigger]
return super().__setitem__(key, value)
def __delitem__(self, key):
model, trigger = self[key]
super().__delitem__(key)
# If we support migration integration, remove from Meta triggers
if features.migrations(): # pragma: no branch
model._meta.triggers.remove(trigger)
# If model._meta.triggers and the original_attrs triggers are the same,
# we don't need to remove it from the original_attrs
if trigger in model._meta.original_attrs["triggers"]: # pragma: no branch
model._meta.original_attrs["triggers"].remove(trigger)
_registry = _Registry()
def set(uri: str, *, model: "Model", trigger: "Trigger") -> None:
"""Set a trigger in the registry
Args:
uri: The trigger URI
model: The trigger model
trigger: The trigger object
"""
_registry[uri] = (model, trigger)
def delete(uri: str) -> None:
"""Delete a trigger from the registry.
Args:
uri: The trigger URI
"""
del _registry[uri]
def registered(*uris: str) -> List[Tuple["Model", "Trigger"]]:
"""
Get registered trigger objects.
Args:
*uris: URIs of triggers to get. If none are provided,
all triggers are returned. URIs are in the format of
`{app_label}.{model_name}:{trigger_name}`.
Returns:
Matching trigger objects.
"""
uris = uris or _registry.keys()
return [_registry[uri] for uri in uris]
def register(*triggers: "Trigger") -> Callable:
"""
Register the given triggers with wrapped Model class.
Args:
*triggers: Trigger classes to register.
Example:
Register by decorating a model:
@pgtrigger.register(
pgtrigger.Protect(
name="append_only",
operation=(pgtrigger.Update | pgtrigger.Delete)
)
)
class MyModel(models.Model):
pass
Example:
Register by calling functionally:
pgtrigger.register(trigger_object)(MyModel)
"""
def _model_wrapper(model_class):
for trigger in triggers:
trigger.register(model_class)
return model_class
return _model_wrapper
|