File: installation.py

package info (click to toggle)
python-django-pgtrigger 4.15.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 956 kB
  • sloc: python: 4,412; makefile: 114; sh: 8; sql: 2
file content (159 lines) | stat: -rw-r--r-- 5,193 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
The primary functional API for pgtrigger
"""

import logging
from typing import List, Tuple, Union

from django.db import DEFAULT_DB_ALIAS, connections

from pgtrigger import features, registry, utils

# The core pgtrigger logger
LOGGER = logging.getLogger("pgtrigger")


def install(*uris: str, database: Union[str, None] = None) -> None:
    """
    Install triggers.

    Args:
        *uris: URIs of triggers to install. If none are provided,
            all triggers are installed and orphaned triggers are pruned.
        database: The database. Defaults to the "default" database.
    """
    for model, trigger in registry.registered(*uris):
        LOGGER.info(
            "pgtrigger: Installing %s trigger for %s table on %s database.",
            trigger,
            model._meta.db_table,
            database or DEFAULT_DB_ALIAS,
        )
        trigger.install(model, database=database)

    if not uris and features.prune_on_install():  # pragma: no branch
        prune(database=database)


def prunable(database: Union[str, None] = None) -> List[Tuple[str, str, bool, str]]:
    """Return triggers that are candidates for pruning

    Args:
        database: The database. Defaults to the "default" database.

    Returns:
        A list of tuples consisting of the table, trigger ID, enablement, and database
    """
    if not utils.is_postgres(database):
        return []

    registered = {
        (utils.quote(model._meta.db_table), trigger.get_pgid(model))
        for model, trigger in registry.registered()
    }

    with utils.connection(database).cursor() as cursor:
        parent_trigger_clause = "tgparentid = 0 AND" if utils.pg_maj_version(cursor) >= 13 else ""

        # Only select triggers that are in the current search path. We accomplish
        # this by parsing the tgrelid and only selecting triggers that don't have
        # a schema name in their path
        cursor.execute(
            f"""
            SELECT tgrelid::regclass, tgname, tgenabled
                FROM pg_trigger
                WHERE tgname LIKE 'pgtrigger_%%' AND
                      {parent_trigger_clause}
                      array_length(parse_ident(tgrelid::regclass::varchar), 1) = 1
            """
        )
        triggers = set(cursor.fetchall())

    return [
        (trigger[0], trigger[1], trigger[2] == "O", database or DEFAULT_DB_ALIAS)
        for trigger in triggers
        if (utils.quote(trigger[0]), trigger[1]) not in registered
    ]


def prune(database: Union[str, None] = None) -> None:
    """
    Remove any pgtrigger triggers in the database that are not used by models.
    I.e. if a model or trigger definition is deleted from a model, ensure
    it is removed from the database

    Args:
        database: The database. Defaults to the "default" database.
    """
    for trigger in prunable(database=database):
        LOGGER.info(
            "pgtrigger: Pruning trigger %s for table %s on %s database.",
            trigger[1],
            trigger[0],
            trigger[3],
        )

        connection = connections[trigger[3]]
        uninstall_sql = utils.render_uninstall(trigger[0], trigger[1])
        with connection.cursor() as cursor:
            cursor.execute(uninstall_sql)


def enable(*uris: str, database: Union[str, None] = None) -> None:
    """
    Enables registered triggers.

    Args:
        *uris: URIs of triggers to enable. If none are provided,
            all triggers are enabled.
        database: The database. Defaults to the "default" database.
    """
    for model, trigger in registry.registered(*uris):
        LOGGER.info(
            "pgtrigger: Enabling %s trigger for %s table on %s database.",
            trigger,
            model._meta.db_table,
            database or DEFAULT_DB_ALIAS,
        )
        trigger.enable(model, database=database)


def uninstall(*uris: str, database: Union[str, None] = None) -> None:
    """
    Uninstalls triggers.

    Args:
        *uris: URIs of triggers to uninstall. If none are provided,
            all triggers are uninstalled and orphaned triggers are pruned.
        database: The database. Defaults to the "default" database.
    """
    for model, trigger in registry.registered(*uris):
        LOGGER.info(
            "pgtrigger: Uninstalling %s trigger for %s table on %s database.",
            trigger,
            model._meta.db_table,
            database or DEFAULT_DB_ALIAS,
        )
        trigger.uninstall(model, database=database)

    if not uris and features.prune_on_install():
        prune(database=database)


def disable(*uris: str, database: Union[str, None] = None) -> None:
    """
    Disables triggers.

    Args:
        *uris: URIs of triggers to disable. If none are provided,
            all triggers are disabled.
        database: The database. Defaults to the "default" database.
    """
    for model, trigger in registry.registered(*uris):
        LOGGER.info(
            "pgtrigger: Disabling %s trigger for %s table on %s database.",
            trigger,
            model._meta.db_table,
            database or DEFAULT_DB_ALIAS,
        )
        trigger.disable(model, database=database)