File: manager.py

package info (click to toggle)
python-django-postgres-extra 2.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,096 kB
  • sloc: python: 9,057; makefile: 17; sh: 7; sql: 1
file content (139 lines) | stat: -rw-r--r-- 4,573 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from typing import List, Optional, Tuple, Type

from django.db import connections

from psqlextra.models import PostgresPartitionedModel

from .config import PostgresPartitioningConfig
from .constants import AUTO_PARTITIONED_COMMENT
from .error import PostgresPartitioningError
from .partition import PostgresPartition
from .plan import PostgresModelPartitioningPlan, PostgresPartitioningPlan

PartitionList = List[Tuple[PostgresPartitionedModel, List[PostgresPartition]]]


class PostgresPartitioningManager:
    """Helps managing partitions by automatically creating new partitions and
    deleting old ones according to the configuration."""

    def __init__(self, configs: List[PostgresPartitioningConfig]) -> None:
        self.configs = configs
        self._validate_configs(self.configs)

    def plan(
        self,
        skip_create: bool = False,
        skip_delete: bool = False,
        using: Optional[str] = None,
    ) -> PostgresPartitioningPlan:
        """Plans which partitions should be deleted/created.

        Arguments:
            skip_create:
                If set to True, no partitions will be marked
                for creation, regardless of the configuration.

            skip_delete:
                If set to True, no partitions will be marked
                for deletion, regardless of the configuration.

            using:
                Optional name of the database connection to use.

        Returns:
            A plan describing what partitions would be created
            and deleted if the plan is applied.
        """

        model_plans = []

        for config in self.configs:
            model_plan = self._plan_for_config(
                config,
                skip_create=skip_create,
                skip_delete=skip_delete,
                using=using,
            )
            if not model_plan:
                continue

            model_plans.append(model_plan)

        return PostgresPartitioningPlan(model_plans)

    def find_config_for_model(
        self, model: PostgresPartitionedModel
    ) -> Optional[PostgresPartitioningConfig]:
        """Finds the partitioning config for the specified model."""

        return next(
            (config for config in self.configs if config.model == model), None
        )

    def _plan_for_config(
        self,
        config: PostgresPartitioningConfig,
        skip_create: bool = False,
        skip_delete: bool = False,
        using: Optional[str] = None,
    ) -> Optional[PostgresModelPartitioningPlan]:
        """Creates a partitioning plan for one partitioning config."""

        connection = connections[using or "default"]
        table = self._get_partitioned_table(connection, config.model)

        model_plan = PostgresModelPartitioningPlan(config)

        if not skip_create:
            for partition in config.strategy.to_create():
                if table.partition_by_name(name=partition.name()):
                    continue

                model_plan.creations.append(partition)

        if not skip_delete:
            for partition in config.strategy.to_delete():
                introspected_partition = table.partition_by_name(
                    name=partition.name()
                )
                if not introspected_partition:
                    break

                if introspected_partition.comment != AUTO_PARTITIONED_COMMENT:
                    continue

                model_plan.deletions.append(partition)

        if len(model_plan.creations) == 0 and len(model_plan.deletions) == 0:
            return None

        return model_plan

    @staticmethod
    def _get_partitioned_table(
        connection, model: Type[PostgresPartitionedModel]
    ):
        with connection.cursor() as cursor:
            table = connection.introspection.get_partitioned_table(
                cursor, model._meta.db_table
            )

        if not table:
            raise PostgresPartitioningError(
                f"Model {model.__name__}, with table "
                f"{model._meta.db_table} does not exists in the "
                "database. Did you run `python manage.py migrate`?"
            )

        return table

    @staticmethod
    def _validate_configs(configs: List[PostgresPartitioningConfig]):
        """Ensures there is only one config per model."""

        models = set([config.model.__name__ for config in configs])
        if len(models) != len(configs):
            raise PostgresPartitioningError(
                "Only one partitioning config per model is allowed"
            )