1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
import enum
from datetime import date, datetime
from typing import Optional, Union
from dateutil.relativedelta import relativedelta
from .error import PostgresPartitioningError
class PostgresTimePartitionUnit(enum.Enum):
YEARS = "years"
MONTHS = "months"
WEEKS = "weeks"
DAYS = "days"
class PostgresTimePartitionSize:
"""Size of a time-based range partition table."""
unit: PostgresTimePartitionUnit
value: int
def __init__(
self,
years: Optional[int] = None,
months: Optional[int] = None,
weeks: Optional[int] = None,
days: Optional[int] = None,
) -> None:
sizes = [years, months, weeks, days]
if not any(sizes):
raise PostgresPartitioningError("Partition cannot be 0 in size.")
if len([size for size in sizes if size and size > 0]) > 1:
raise PostgresPartitioningError(
"Partition can only have on size unit."
)
if years:
self.unit = PostgresTimePartitionUnit.YEARS
self.value = years
elif months:
self.unit = PostgresTimePartitionUnit.MONTHS
self.value = months
elif weeks:
self.unit = PostgresTimePartitionUnit.WEEKS
self.value = weeks
elif days:
self.unit = PostgresTimePartitionUnit.DAYS
self.value = days
else:
raise PostgresPartitioningError(
"Unsupported time partitioning unit"
)
def as_delta(self) -> relativedelta:
if self.unit == PostgresTimePartitionUnit.YEARS:
return relativedelta(years=self.value)
if self.unit == PostgresTimePartitionUnit.MONTHS:
return relativedelta(months=self.value)
if self.unit == PostgresTimePartitionUnit.WEEKS:
return relativedelta(weeks=self.value)
if self.unit == PostgresTimePartitionUnit.DAYS:
return relativedelta(days=self.value)
raise PostgresPartitioningError(
"Unsupported time partitioning unit: %s" % self.unit
)
def start(self, dt: datetime) -> datetime:
if self.unit == PostgresTimePartitionUnit.YEARS:
return self._ensure_datetime(dt.replace(month=1, day=1))
if self.unit == PostgresTimePartitionUnit.MONTHS:
return self._ensure_datetime(dt.replace(day=1))
if self.unit == PostgresTimePartitionUnit.WEEKS:
return self._ensure_datetime(dt - relativedelta(days=dt.weekday()))
return self._ensure_datetime(dt)
@staticmethod
def _ensure_datetime(dt: Union[date, datetime]) -> datetime:
return datetime(year=dt.year, month=dt.month, day=dt.day)
def __repr__(self) -> str:
return "PostgresTimePartitionSize<%s, %s>" % (self.unit, self.value)
__all__ = ["PostgresTimePartitionUnit", "PostgresTimePartitionSize"]
|