1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
|
"""Library for reading python tzdata files.
An rfc5545 calendar references timezones in an unambiguous way, defined
by a set of specific transitions. As a result, a calendar must have
complete information about the timezones it references.
Python's zoneinfo package does not expose a full representation of system
supported timezones. It will use the system defined timezone, but then also
may fallback to the tzdata package as a fallback.
This package uses the tzdata package as a definition for timezones, under
the assumption that it should be similar to existing python supported
timezones.
Note: This implementation is more verbose than the zoneinfo implementation
and contains more documentation and references to the file format to serve
as a resource for understanding the format. See rfc8536 for TZif file format.
"""
import enum
import io
import logging
import struct
from collections import namedtuple
from collections.abc import Callable
from dataclasses import dataclass
from functools import cache
from typing import Sequence
from .model import LeapSecond, TimezoneInfo, Transition
from .tz_rule import parse_tz_rule
_LOGGER = logging.getLogger(__name__)
# Records specifying the local time type
_LOCAL_TIME_TYPE_STRUCT_FORMAT = "".join(
[
">", # Use standard size of packed value bytes
"l", # utoff (4 bytes): Number of seconds to add to UTC to determine local time
"?", # dst (1 byte): Indicates the time is DST (1) or standard (0)
"B", # idx (1 byte): Offset index into the time zone designation octets (0-charcnt-1)
]
)
_LOCAL_TIME_RECORD_SIZE = 6
class _TZifVersion(enum.Enum):
"""Defines information related to _TZifVersions."""
V1 = (b"\x00", 4, "l") # 32-bit in v1
V2 = (b"2", 8, "q") # 64-bit in v2+
V3 = (b"3", 8, "q")
def __init__(self, version: bytes, time_size: int, time_format: str):
self._version = version
self._time_size = time_size
self._time_format = time_format
@property
def version(self) -> bytes:
"""Return the version byte string."""
return self._version
@property
def time_size(self) -> int:
"""Return the TIME_SIZE used in the data block parsing."""
return self._time_size
@property
def time_format(self) -> str:
"""Return the struct unpack format string for TIME_SIZE objects."""
return self._time_format
@dataclass
class _Header:
"""TZif _Header information."""
SIZE = 44 # Total size of the header to read
STRUCT_FORMAT = "".join(
[
">", # Use standard size of packed value bytes
"4s", # magic (4 bytes)
"c", # version (1 byte)
"15x", # unused
"6l", # isutccnt, isstdcnt, leapcnt, timecnt, typecnt, charcnt
]
)
MAGIC = "TZif".encode()
version: bytes
"""The version of the files format."""
isutccnt: int
"""The number of UTC/local indicators in the data block."""
isstdcnt: int
"""The number of standard/wall indicators in the data block."""
leapcnt: int
"""The number of leap second records in the data block."""
timecnt: int
"""The number of time transitions in the data block."""
typecnt: int
"""The number of local time type records in the data block."""
charcnt: int
"""The number of characters for time zone designations in the data block."""
@classmethod
def from_bytes(cls, header_bytes: bytes) -> "_Header":
"""Parse the header bytes into a file."""
(
magic,
version,
isutccnt,
isstdcnt,
leapcnt,
timecnt,
typecnt,
charcnt,
) = struct.unpack(_Header.STRUCT_FORMAT, header_bytes)
if magic != _Header.MAGIC:
raise ValueError("zoneinfo file did not contain magic header")
if isutccnt not in (0, typecnt):
raise ValueError(
f"UTC/local indicators in datablock mismatched ({isutccnt}, {typecnt})"
)
if isstdcnt not in (0, typecnt):
raise ValueError(
f"standard/wall indicators in datablock mismatched ({isutccnt}, {typecnt})"
)
return _Header(version, isutccnt, isstdcnt, leapcnt, timecnt, typecnt, charcnt)
_TransitionBlock = namedtuple(
"_TransitionBlock", ["transition_time", "time_type", "isstdcnt", "isutccnt"]
)
# A series of records specifying the local time type:
# - utoff (4 bytes): Number of seconds to add to UTC to determine local time
# - dst (1 byte): Indicates the time is DST (1) or standard (0)
# - idx (1 byte): Offset index into the time zone designation octets (0-charcnt-1)
# is the utoff (4 bytes), dst (1 byte), idx (1 byte).
_LocalTimeType = namedtuple("_LocalTimeType", ["utoff", "dst", "idx"])
def _new_transition(
transition: _TransitionBlock,
local_time_types: list[_LocalTimeType],
get_tz_designations: Callable[[int], str],
) -> Transition:
"""ddd."""
if transition.time_type >= len(local_time_types):
raise ValueError(
f"transition_type out of bounds {transition.time_type} >= {len(local_time_types)}"
)
if transition.isutccnt and not transition.isstdcnt:
raise ValueError("isutccnt was True but isstdcnt was False")
(utoff, dst, idx) = local_time_types[transition.time_type]
return Transition(
transition.transition_time,
utoff,
dst,
transition.isstdcnt,
transition.isutccnt,
get_tz_designations(idx),
)
def _read_datablock(
header: _Header, version: _TZifVersion, buf: io.BytesIO
) -> tuple[list[Transition], list[LeapSecond]]:
"""Read records from the buffer."""
# A series of leap-time values in sorted order
transition_times = struct.unpack(
f">{header.timecnt}{version.time_format}",
buf.read(header.timecnt * version.time_size),
)
# A series of integers specifying the type of local time of the corresponding
# transition time. These are zero-based indices into the array of local
# time type records. (from 0 to typecnt-1)
transition_types: Sequence[int] = []
if header.timecnt > 0:
transition_types = struct.unpack(
f">{header.timecnt}B", buf.read(header.timecnt)
)
local_time_types: list[_LocalTimeType] = [
_LocalTimeType._make(
struct.unpack(
_LOCAL_TIME_TYPE_STRUCT_FORMAT, buf.read(_LOCAL_TIME_RECORD_SIZE)
)
)
for _ in range(header.typecnt)
]
# An array of NUL-terminated time zone designation strings
tz_designations = buf.read(header.charcnt)
@cache
def get_tz_designations(idx: int) -> str:
"""Find the null terminated string starting at the specified index."""
end = tz_designations.find(b"\x00", idx)
return tz_designations[idx:end].decode("UTF-8")
leap_seconds: list[LeapSecond] = [
LeapSecond._make(
struct.unpack(
f">{version.time_format}l",
buf.read(version.time_size + 4), # occur + corr
)
)
for _ in range(header.leapcnt)
]
# Standard/wall indicators determine if the transition times are standard time (1)
# or wall clock time (0).
isstdcnt_types: list[bool] = []
if header.isstdcnt > 0:
isstdcnt_types.extend(
struct.unpack(
f">{header.isstdcnt}?",
buf.read(header.isstdcnt),
)
)
isstdcnt_types.extend([False] * (header.timecnt - header.isstdcnt))
# UTC/local indicators determine if the transition times are UTC (1) or local time (0).
isutccnt_types: list[bool] = []
if header.isutccnt > 0:
isutccnt_types.extend(
struct.unpack(f">{header.isutccnt}?", buf.read(header.isutccnt))
)
isutccnt_types.extend([False] * (header.timecnt - header.isutccnt))
transitions = [
_new_transition(
_TransitionBlock(*values), local_time_types, get_tz_designations
)
for values in zip(
transition_times, transition_types, isstdcnt_types, isutccnt_types
)
]
return (transitions, leap_seconds)
def read_tzif(content: bytes) -> TimezoneInfo:
"""Read the TZif file and parse and return the timezone records."""
buf = io.BytesIO(content)
# V1 header and block
header = _Header.from_bytes(buf.read(_Header.SIZE))
if header.version == _TZifVersion.V1.version:
if header.typecnt == 0:
raise ValueError("Local time records in block is zero")
if header.charcnt == 0:
raise ValueError("Total number of octets is zero")
(transitions, leap_seconds) = _read_datablock(header, _TZifVersion.V1, buf)
if header.version == _TZifVersion.V1.version:
return TimezoneInfo(transitions, leap_seconds)
# V2+ header and block
header = _Header.from_bytes(buf.read(_Header.SIZE))
if header.typecnt == 0:
raise ValueError("Local time records in block is zero")
if header.charcnt == 0:
raise ValueError("Total number of octets is zero")
(transitions, leap_seconds) = _read_datablock(header, _TZifVersion.V2, buf)
# V2+ footer
footer = buf.read()
parts = footer.decode("UTF-8").split("\n")
if len(parts) != 3:
raise ValueError("Failed to read TZ footer")
rule = None
if parts[1]:
rule = parse_tz_rule(parts[1])
return TimezoneInfo(transitions, leap_seconds, rule=rule)
|