File: registry.py

package info (click to toggle)
python-django-pgtrigger 4.15.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 956 kB
  • sloc: python: 4,412; makefile: 114; sh: 8; sql: 2
file content (162 lines) | stat: -rw-r--r-- 4,977 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import collections
from typing import TYPE_CHECKING, Callable, List, Tuple

from pgtrigger import features

_unset = object()


if TYPE_CHECKING:
    from django.db.models import Model

    from pgtrigger.core import Trigger


# All registered triggers for each model
class _Registry(collections.UserDict):
    @property
    def pg_function_names(self):
        """
        The postgres function names of all registered triggers
        """
        return {trigger.get_pgid(model) for model, trigger in self.values()}

    @property
    def by_db_table(self):
        """
        Return the registry keys by db_table, name
        """
        return {(model._meta.db_table, trigger.name): trigger for model, trigger in self.values()}

    def __getitem__(self, key):
        assert isinstance(key, str)
        if len(key.split(":")) == 1:
            raise ValueError(
                'Trigger URI must be in the format of "app_label.model_name:trigger_name"'
            )
        elif key not in _registry:
            raise KeyError(f'URI "{key}" not found in pgtrigger registry')

        return super().__getitem__(key)

    def __setitem__(self, key, value):
        assert isinstance(key, str)
        model, trigger = value
        assert f"{model._meta.label}:{trigger.name}" == key

        found_trigger = self.by_db_table.get((model._meta.db_table, trigger.name))

        if not found_trigger or found_trigger != trigger:
            if found_trigger:
                raise KeyError(
                    f'Trigger name "{trigger.name}" already'
                    f' used for model "{model._meta.label}"'
                    f' table "{model._meta.db_table}".'
                )

            if trigger.get_pgid(model) in self.pg_function_names:
                raise KeyError(
                    f'Trigger "{trigger.name}" on model "{model._meta.label}"'
                    " has Postgres function name that's already in use."
                    " Use a different name for the trigger."
                )

        # Add the trigger to Meta.triggers.
        # Note, pgtrigger's App.ready() method auto-registers any
        # triggers in Meta already, meaning the trigger may already exist. If so, ignore it
        if features.migrations():  # pragma: no branch
            if trigger not in getattr(model._meta, "triggers", []):
                model._meta.triggers = list(getattr(model._meta, "triggers", [])) + [trigger]

            if trigger not in model._meta.original_attrs.get("triggers", []):
                model._meta.original_attrs["triggers"] = list(
                    model._meta.original_attrs.get("triggers", [])
                ) + [trigger]

        return super().__setitem__(key, value)

    def __delitem__(self, key):
        model, trigger = self[key]

        super().__delitem__(key)

        # If we support migration integration, remove from Meta triggers
        if features.migrations():  # pragma: no branch
            model._meta.triggers.remove(trigger)
            # If model._meta.triggers and the original_attrs triggers are the same,
            # we don't need to remove it from the original_attrs
            if trigger in model._meta.original_attrs["triggers"]:  # pragma: no branch
                model._meta.original_attrs["triggers"].remove(trigger)


_registry = _Registry()


def set(uri: str, *, model: "Model", trigger: "Trigger") -> None:
    """Set a trigger in the registry

    Args:
        uri: The trigger URI
        model: The trigger model
        trigger: The trigger object
    """
    _registry[uri] = (model, trigger)


def delete(uri: str) -> None:
    """Delete a trigger from the registry.

    Args:
        uri: The trigger URI
    """
    del _registry[uri]


def registered(*uris: str) -> List[Tuple["Model", "Trigger"]]:
    """
    Get registered trigger objects.

    Args:
        *uris: URIs of triggers to get. If none are provided,
            all triggers are returned. URIs are in the format of
            `{app_label}.{model_name}:{trigger_name}`.

    Returns:
        Matching trigger objects.
    """
    uris = uris or _registry.keys()
    return [_registry[uri] for uri in uris]


def register(*triggers: "Trigger") -> Callable:
    """
    Register the given triggers with wrapped Model class.

    Args:
        *triggers: Trigger classes to register.

    Example:
        Register by decorating a model:

            @pgtrigger.register(
                pgtrigger.Protect(
                    name="append_only",
                    operation=(pgtrigger.Update | pgtrigger.Delete)
                )
            )
            class MyModel(models.Model):
                pass

    Example:
        Register by calling functionally:

            pgtrigger.register(trigger_object)(MyModel)
    """

    def _model_wrapper(model_class):
        for trigger in triggers:
            trigger.register(model_class)

        return model_class

    return _model_wrapper