File: current_time_strategy.py

package info (click to toggle)
python-django-postgres-extra 2.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,096 kB
  • sloc: python: 9,057; makefile: 17; sh: 7; sql: 1
file content (80 lines) | stat: -rw-r--r-- 2,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from datetime import datetime, timezone
from typing import Generator, Optional

from dateutil.relativedelta import relativedelta

from .range_strategy import PostgresRangePartitioningStrategy
from .time_partition import PostgresTimePartition
from .time_partition_size import PostgresTimePartitionSize


class PostgresCurrentTimePartitioningStrategy(
    PostgresRangePartitioningStrategy
):
    """Implments a time based partitioning strategy where each partition
    contains values for a specific time period.

    All buckets will be equal in size and start at the start of the
    unit. With monthly partitioning, partitions start on the 1st and
    with weekly partitioning, partitions start on monday.
    """

    def __init__(
        self,
        size: PostgresTimePartitionSize,
        count: int,
        max_age: Optional[relativedelta] = None,
        name_format: Optional[str] = None,
    ) -> None:
        """Initializes a new instance of :see:PostgresTimePartitioningStrategy.

        Arguments:
            size:
                The size of each partition.

            count:
                The amount of partitions to create ahead
                from the current date/time.

            max_age:
                Maximum age of a partition. Partitions
                older than this are deleted during
                auto cleanup.
        """

        self.size = size
        self.count = count
        self.max_age = max_age
        self.name_format = name_format

    def to_create(self) -> Generator[PostgresTimePartition, None, None]:
        current_datetime = self.size.start(self.get_start_datetime())

        for _ in range(self.count):
            yield PostgresTimePartition(
                start_datetime=current_datetime,
                size=self.size,
                name_format=self.name_format,
            )

            current_datetime += self.size.as_delta()

    def to_delete(self) -> Generator[PostgresTimePartition, None, None]:
        if not self.max_age:
            return

        current_datetime = self.size.start(
            self.get_start_datetime() - self.max_age
        )

        while True:
            yield PostgresTimePartition(
                start_datetime=current_datetime,
                size=self.size,
                name_format=self.name_format,
            )

            current_datetime -= self.size.as_delta()

    def get_start_datetime(self) -> datetime:
        return datetime.now(timezone.utc)