File: tzif.py

package info (click to toggle)
python-ical 12.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,776 kB
  • sloc: python: 15,157; sh: 9; makefile: 5
file content (284 lines) | stat: -rw-r--r-- 9,621 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""Library for reading python tzdata files.

An rfc5545 calendar references timezones in an unambiguous way, defined
by a set of specific transitions. As a result, a calendar must have
complete information about the timezones it references.

Python's zoneinfo package does not expose a full representation of system
supported timezones. It will use the system defined timezone, but then also
may fallback to the tzdata package as a fallback.

This package uses the tzdata package as a definition for timezones, under
the assumption that it should be similar to existing python supported
timezones.

Note: This implementation is more verbose than the zoneinfo implementation
and contains more documentation and references to the file format to serve
as a resource for understanding the format. See rfc8536 for TZif file format.
"""

import enum
import io
import logging
import struct
from collections import namedtuple
from collections.abc import Callable
from dataclasses import dataclass
from functools import cache
from typing import Sequence

from .model import LeapSecond, TimezoneInfo, Transition
from .tz_rule import parse_tz_rule

_LOGGER = logging.getLogger(__name__)

# Records specifying the local time type
_LOCAL_TIME_TYPE_STRUCT_FORMAT = "".join(
    [
        ">",  # Use standard size of packed value bytes
        "l",  # utoff (4 bytes): Number of seconds to add to UTC to determine local time
        "?",  # dst (1 byte): Indicates the time is DST (1) or standard (0)
        "B",  # idx (1 byte): Offset index into the time zone designation octets (0-charcnt-1)
    ]
)
_LOCAL_TIME_RECORD_SIZE = 6


class _TZifVersion(enum.Enum):
    """Defines information related to _TZifVersions."""

    V1 = (b"\x00", 4, "l")  # 32-bit in v1
    V2 = (b"2", 8, "q")  # 64-bit in v2+
    V3 = (b"3", 8, "q")

    def __init__(self, version: bytes, time_size: int, time_format: str):
        self._version = version
        self._time_size = time_size
        self._time_format = time_format

    @property
    def version(self) -> bytes:
        """Return the version byte string."""
        return self._version

    @property
    def time_size(self) -> int:
        """Return the TIME_SIZE used in the data block parsing."""
        return self._time_size

    @property
    def time_format(self) -> str:
        """Return the struct unpack format string for TIME_SIZE objects."""
        return self._time_format


@dataclass
class _Header:
    """TZif _Header information."""

    SIZE = 44  # Total size of the header to read
    STRUCT_FORMAT = "".join(
        [
            ">",  # Use standard size of packed value bytes
            "4s",  # magic (4 bytes)
            "c",  # version (1 byte)
            "15x",  # unused
            "6l",  # isutccnt, isstdcnt, leapcnt, timecnt, typecnt, charcnt
        ]
    )
    MAGIC = "TZif".encode()

    version: bytes
    """The version of the files format."""

    isutccnt: int
    """The number of UTC/local indicators in the data block."""

    isstdcnt: int
    """The number of standard/wall indicators in the data block."""

    leapcnt: int
    """The number of leap second records in the data block."""

    timecnt: int
    """The number of time transitions in the data block."""

    typecnt: int
    """The number of local time type records in the data block."""

    charcnt: int
    """The number of characters for time zone designations in the data block."""

    @classmethod
    def from_bytes(cls, header_bytes: bytes) -> "_Header":
        """Parse the header bytes into a file."""
        (
            magic,
            version,
            isutccnt,
            isstdcnt,
            leapcnt,
            timecnt,
            typecnt,
            charcnt,
        ) = struct.unpack(_Header.STRUCT_FORMAT, header_bytes)
        if magic != _Header.MAGIC:
            raise ValueError("zoneinfo file did not contain magic header")
        if isutccnt not in (0, typecnt):
            raise ValueError(
                f"UTC/local indicators in datablock mismatched ({isutccnt}, {typecnt})"
            )
        if isstdcnt not in (0, typecnt):
            raise ValueError(
                f"standard/wall indicators in datablock mismatched ({isutccnt}, {typecnt})"
            )
        return _Header(version, isutccnt, isstdcnt, leapcnt, timecnt, typecnt, charcnt)


_TransitionBlock = namedtuple(
    "_TransitionBlock", ["transition_time", "time_type", "isstdcnt", "isutccnt"]
)

# A series of records specifying the local time type:
#  - utoff (4 bytes): Number of seconds to add to UTC to determine local time
#  - dst (1 byte): Indicates the time is DST (1) or standard (0)
#  - idx (1 byte):  Offset index into the time zone designation octets (0-charcnt-1)
# is the utoff (4 bytes), dst (1 byte), idx (1 byte).
_LocalTimeType = namedtuple("_LocalTimeType", ["utoff", "dst", "idx"])


def _new_transition(
    transition: _TransitionBlock,
    local_time_types: list[_LocalTimeType],
    get_tz_designations: Callable[[int], str],
) -> Transition:
    """ddd."""
    if transition.time_type >= len(local_time_types):
        raise ValueError(
            f"transition_type out of bounds {transition.time_type} >= {len(local_time_types)}"
        )
    if transition.isutccnt and not transition.isstdcnt:
        raise ValueError("isutccnt was True but isstdcnt was False")
    (utoff, dst, idx) = local_time_types[transition.time_type]
    return Transition(
        transition.transition_time,
        utoff,
        dst,
        transition.isstdcnt,
        transition.isutccnt,
        get_tz_designations(idx),
    )


def _read_datablock(
    header: _Header, version: _TZifVersion, buf: io.BytesIO
) -> tuple[list[Transition], list[LeapSecond]]:
    """Read records from the buffer."""
    # A series of leap-time values in sorted order
    transition_times = struct.unpack(
        f">{header.timecnt}{version.time_format}",
        buf.read(header.timecnt * version.time_size),
    )

    # A series of integers specifying the type of local time of the corresponding
    # transition time. These are zero-based indices into the array of local
    # time type records. (from 0 to typecnt-1)
    transition_types: Sequence[int] = []
    if header.timecnt > 0:
        transition_types = struct.unpack(
            f">{header.timecnt}B", buf.read(header.timecnt)
        )

    local_time_types: list[_LocalTimeType] = [
        _LocalTimeType._make(
            struct.unpack(
                _LOCAL_TIME_TYPE_STRUCT_FORMAT, buf.read(_LOCAL_TIME_RECORD_SIZE)
            )
        )
        for _ in range(header.typecnt)
    ]

    # An array of NUL-terminated time zone designation strings
    tz_designations = buf.read(header.charcnt)

    @cache
    def get_tz_designations(idx: int) -> str:
        """Find the null terminated string starting at the specified index."""
        end = tz_designations.find(b"\x00", idx)
        return tz_designations[idx:end].decode("UTF-8")

    leap_seconds: list[LeapSecond] = [
        LeapSecond._make(
            struct.unpack(
                f">{version.time_format}l",
                buf.read(version.time_size + 4),  # occur + corr
            )
        )
        for _ in range(header.leapcnt)
    ]

    # Standard/wall indicators determine if the transition times are standard time (1)
    # or wall clock time (0).
    isstdcnt_types: list[bool] = []
    if header.isstdcnt > 0:
        isstdcnt_types.extend(
            struct.unpack(
                f">{header.isstdcnt}?",
                buf.read(header.isstdcnt),
            )
        )
    isstdcnt_types.extend([False] * (header.timecnt - header.isstdcnt))

    # UTC/local indicators determine if the transition times are UTC (1) or local time (0).
    isutccnt_types: list[bool] = []
    if header.isutccnt > 0:
        isutccnt_types.extend(
            struct.unpack(f">{header.isutccnt}?", buf.read(header.isutccnt))
        )
    isutccnt_types.extend([False] * (header.timecnt - header.isutccnt))

    transitions = [
        _new_transition(
            _TransitionBlock(*values), local_time_types, get_tz_designations
        )
        for values in zip(
            transition_times, transition_types, isstdcnt_types, isutccnt_types
        )
    ]

    return (transitions, leap_seconds)


def read_tzif(content: bytes) -> TimezoneInfo:
    """Read the TZif file and parse and return the timezone records."""
    buf = io.BytesIO(content)

    # V1 header and block
    header = _Header.from_bytes(buf.read(_Header.SIZE))
    if header.version == _TZifVersion.V1.version:
        if header.typecnt == 0:
            raise ValueError("Local time records in block is zero")
        if header.charcnt == 0:
            raise ValueError("Total number of octets is zero")
    (transitions, leap_seconds) = _read_datablock(header, _TZifVersion.V1, buf)
    if header.version == _TZifVersion.V1.version:
        return TimezoneInfo(transitions, leap_seconds)

    # V2+ header and block
    header = _Header.from_bytes(buf.read(_Header.SIZE))
    if header.typecnt == 0:
        raise ValueError("Local time records in block is zero")
    if header.charcnt == 0:
        raise ValueError("Total number of octets is zero")

    (transitions, leap_seconds) = _read_datablock(header, _TZifVersion.V2, buf)

    # V2+ footer
    footer = buf.read()
    parts = footer.decode("UTF-8").split("\n")
    if len(parts) != 3:
        raise ValueError("Failed to read TZ footer")
    rule = None
    if parts[1]:
        rule = parse_tz_rule(parts[1])
    return TimezoneInfo(transitions, leap_seconds, rule=rule)