File: time_partition_size.py

package info (click to toggle)
python-django-postgres-extra 2.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,096 kB
  • sloc: python: 9,057; makefile: 17; sh: 7; sql: 1
file content (95 lines) | stat: -rw-r--r-- 2,927 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import enum

from datetime import date, datetime
from typing import Optional, Union

from dateutil.relativedelta import relativedelta

from .error import PostgresPartitioningError


class PostgresTimePartitionUnit(enum.Enum):
    YEARS = "years"
    MONTHS = "months"
    WEEKS = "weeks"
    DAYS = "days"


class PostgresTimePartitionSize:
    """Size of a time-based range partition table."""

    unit: PostgresTimePartitionUnit
    value: int

    def __init__(
        self,
        years: Optional[int] = None,
        months: Optional[int] = None,
        weeks: Optional[int] = None,
        days: Optional[int] = None,
    ) -> None:
        sizes = [years, months, weeks, days]

        if not any(sizes):
            raise PostgresPartitioningError("Partition cannot be 0 in size.")

        if len([size for size in sizes if size and size > 0]) > 1:
            raise PostgresPartitioningError(
                "Partition can only have on size unit."
            )

        if years:
            self.unit = PostgresTimePartitionUnit.YEARS
            self.value = years
        elif months:
            self.unit = PostgresTimePartitionUnit.MONTHS
            self.value = months
        elif weeks:
            self.unit = PostgresTimePartitionUnit.WEEKS
            self.value = weeks
        elif days:
            self.unit = PostgresTimePartitionUnit.DAYS
            self.value = days
        else:
            raise PostgresPartitioningError(
                "Unsupported time partitioning unit"
            )

    def as_delta(self) -> relativedelta:
        if self.unit == PostgresTimePartitionUnit.YEARS:
            return relativedelta(years=self.value)

        if self.unit == PostgresTimePartitionUnit.MONTHS:
            return relativedelta(months=self.value)

        if self.unit == PostgresTimePartitionUnit.WEEKS:
            return relativedelta(weeks=self.value)

        if self.unit == PostgresTimePartitionUnit.DAYS:
            return relativedelta(days=self.value)

        raise PostgresPartitioningError(
            "Unsupported time partitioning unit: %s" % self.unit
        )

    def start(self, dt: datetime) -> datetime:
        if self.unit == PostgresTimePartitionUnit.YEARS:
            return self._ensure_datetime(dt.replace(month=1, day=1))

        if self.unit == PostgresTimePartitionUnit.MONTHS:
            return self._ensure_datetime(dt.replace(day=1))

        if self.unit == PostgresTimePartitionUnit.WEEKS:
            return self._ensure_datetime(dt - relativedelta(days=dt.weekday()))

        return self._ensure_datetime(dt)

    @staticmethod
    def _ensure_datetime(dt: Union[date, datetime]) -> datetime:
        return datetime(year=dt.year, month=dt.month, day=dt.day)

    def __repr__(self) -> str:
        return "PostgresTimePartitionSize<%s, %s>" % (self.unit, self.value)


__all__ = ["PostgresTimePartitionUnit", "PostgresTimePartitionSize"]