File: common.py

package info (click to toggle)
zchunk 1.5.2%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,704 kB
  • sloc: ansic: 13,244; python: 457; sh: 260; makefile: 13
file content (296 lines) | stat: -rw-r--r-- 8,156 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""Common routines for the Python zchunk tests."""

from __future__ import annotations

import argparse
import dataclasses
import functools
import os
import subprocess  # noqa: S404
import sys
import typing

import pyparsing as pyp

from pychunk import defs


if typing.TYPE_CHECKING:
    import pathlib
    from typing import Final


@dataclasses.dataclass(frozen=True)
class Config:
    """Common runtime configuration settings."""

    bindir: pathlib.Path
    env: dict[str, str]

    orig: pathlib.Path
    compressed: pathlib.Path


@dataclasses.dataclass(frozen=True)
class Chunk:
    """A single chunk descriptor."""

    cstart: int
    start: int
    csize: int
    size: int
    cend: int
    end: int


def get_runenv() -> dict[str, str]:
    """Set up the environment for running the zchunk programs."""
    env: Final = dict(os.environ)
    env["LC_ALL"] = "C.UTF-8"
    env["LANGUAGE"] = ""
    return env


def base_parser(prog: str) -> argparse.ArgumentParser:
    """Create a parser with the common options."""
    parser: Final = argparse.ArgumentParser(prog=prog)
    parser.add_argument(
        "-d",
        "--bindir",
        type=str,
        required=True,
        help="path to the directory containing the zchunk tools",
    )
    parser.add_argument(
        "-f",
        "--filename",
        type=str,
        required=True,
        help="path to the filename to compress",
    )

    return parser


def do_compress(cfg: Config, orig_size: int) -> int:
    """Compress the original file."""
    print(f"About to compress {cfg.orig} to {cfg.compressed}")
    if cfg.compressed.exists():
        sys.exit(f"Did not expect {cfg.compressed} to exist")
    subprocess.check_call(
        [cfg.bindir / "zck", "-o", cfg.compressed, "--", cfg.orig],
        shell=False,
        env=cfg.env,
    )
    if not cfg.compressed.is_file():
        sys.exit(f"zck did not create the {cfg.compressed} file")
    comp_size: Final = cfg.compressed.stat().st_size
    print(f"{cfg.compressed} size is {comp_size} bytes long")
    if comp_size >= orig_size:
        sys.exit(f"sizeof({cfg.compressed}) == {comp_size} : sizeof({cfg.orig}) == {orig_size}")
    start: Final = cfg.compressed.open(mode="rb").read(5)
    print(f"{cfg.compressed} starts with {start!r}")
    if start != defs.MAGIC:
        sys.exit(f"{cfg.compressed} does not start with {defs.MAGIC!r}: {start!r}")

    return comp_size


@dataclasses.dataclass(frozen=True)
class PChunk:
    """A description of a single chunk."""

    idx: int
    cksum: str
    start: int
    comp_size: int
    size: int


@dataclasses.dataclass(frozen=True)
class PChunks:
    """All the parsed chunks, still in `PChunk` format."""

    chunks: list[PChunk]


_p_ws = pyp.Char(" \t\f\b")[...].suppress()
"""Skip whitespace within a line."""

_p_total_size = (
    pyp.Literal("Data size:").suppress()
    + _p_ws.suppress()
    + pyp.common.integer("data_size")
    + _p_ws.suppress()
    + pyp.Char("\n").suppress()
)
"""Match the header line specifying the total size of the compressed data."""

_p_chunk_count = (
    pyp.Literal("Chunk count:").suppress()
    + _p_ws.suppress()
    + pyp.common.integer("count")
    + _p_ws.suppress()
    + pyp.Char("\n").suppress()
)
"""Match the header line specifying the number of chunks."""

_p_chunks_header = (
    _p_ws.suppress()
    + pyp.Literal("Chunk")
    + _p_ws.suppress()
    + pyp.Literal("Checksum")
    + _p_ws.suppress()
    + pyp.Literal("Start")
    + _p_ws.suppress()
    + pyp.Literal("Comp size")
    + _p_ws.suppress()
    + pyp.Literal("Size")
    + _p_ws.suppress()
    + pyp.Char("\n").suppress()
)
"""Match the header line of the chunks table itself."""

_p_chunk: Final[pyp.ParserElement] = (
    _p_ws.suppress()
    + pyp.common.integer("idx")
    + _p_ws.suppress()
    + pyp.Word("0123456789abcdef")("cksum")
    + _p_ws.suppress()
    + pyp.common.integer("start")
    + _p_ws.suppress()
    + pyp.common.integer("comp_size")
    + _p_ws.suppress()
    + pyp.common.integer("size")
    + _p_ws.suppress()
    + pyp.Char("\n").suppress()
)
"""Match a single chunk line within the chunks table."""


@_p_chunk.set_parse_action
def _parse_chunk(tokens: pyp.ParseResults) -> PChunk:
    """Parse a single chunk description line."""
    return PChunk(
        idx=tokens["idx"],
        cksum=tokens["cksum"],
        start=tokens["start"],
        comp_size=tokens["comp_size"],
        size=tokens["size"],
    )


_p_chunks: Final[pyp.ParserElement] = _p_chunk()[1, ...]
"""Match all the chunks in the chunks table."""


@_p_chunks.set_parse_action
def _parse_chunks(tokens: pyp.ParseResults) -> PChunks:
    """Parse all the chunks."""
    chunks: Final[list[PChunk]] = tokens.as_list()
    weird: Final = [elem for elem in chunks if not isinstance(elem, PChunk)]
    if weird:
        raise ValueError(repr(weird))
    return PChunks(chunks=chunks)


_p_all_chunks: Final[pyp.ParserElement] = (
    ...
    + _p_total_size("total_size")
    + ...
    + _p_chunk_count("chunk_count")
    + ...
    + _p_chunks_header.suppress()
    + _p_chunks("chunks")
)
"""Match all the chunks along with the data from the header."""


@_p_all_chunks.set_parse_action
def _parse_all_chunks(tokens: pyp.ParseResults) -> list[Chunk]:
    """Parse and validate all the chunks."""
    total_size: Final[int] = tokens["total_size"].as_list()[0]
    chunk_count: Final[int] = tokens["chunk_count"].as_list()[0]
    chunks: Final[PChunks] = tokens["chunks"]
    if len(chunks.chunks) != chunk_count:
        raise ValueError(repr((chunks.chunks, chunk_count)))

    # The first fake chunk should always represent the header, right?
    hdr_chunk: Final = chunks.chunks[0]
    if (
        hdr_chunk.start < 1
        or hdr_chunk.size != 0
        or hdr_chunk.comp_size != 0
        or any(char != "0" for char in hdr_chunk.cksum)
    ):
        raise ValueError(repr(hdr_chunk))

    def single(acc: list[Chunk], chunk: PChunk) -> list[Chunk]:
        """Validate and process a single parsed chunk."""
        last_chunk: Final = acc[-1]
        if chunk.start != last_chunk.cend:
            raise ValueError(repr((acc, chunk)))
        acc.append(
            Chunk(
                cstart=chunk.start,
                start=last_chunk.end,
                csize=chunk.comp_size,
                size=chunk.size,
                cend=last_chunk.cend + chunk.comp_size,
                end=last_chunk.end + chunk.size,
            ),
        )
        return acc

    res: Final = functools.reduce(
        single,
        chunks.chunks[1:],
        [
            Chunk(
                cstart=0,
                start=0,
                csize=0,
                size=0,
                cend=hdr_chunk.start,
                end=0,
            ),
        ],
    )
    if len(res) != chunk_count or res[-1].cend != hdr_chunk.start + total_size:
        raise ValueError(repr((chunk_count, total_size, chunks, res)))
    return res


_p_all_chunks_complete: Final = _p_all_chunks.leave_whitespace()
"""Parse the full output of the `zck_read_header` utility."""


def read_chunks(cfg: Config, orig_size: int, comp_size: int) -> Chunk:
    """Parse the chunks of the compressed file."""
    output: Final = subprocess.check_output(
        [cfg.bindir / "zck_read_header", "-c", "--", cfg.compressed],
        encoding="UTF-8",
        env=cfg.env,
    )
    res: Final[list[Chunk]] = _p_all_chunks_complete.parse_string(output, parse_all=True).as_list()
    if not all(isinstance(elem, Chunk) for elem in res):
        raise ValueError(repr(res))

    if res[-1].end != orig_size or res[-1].cend != comp_size:
        raise ValueError(repr(res))

    try:
        hdr_chunk, first_chunk, second_chunk = res[:3]
    except ValueError as err:
        raise ValueError(repr(res)) from err
    if (
        hdr_chunk.size != 0  # noqa: PLR0916  # we do need to check all of those
        or hdr_chunk.end != 0
        or first_chunk.start != hdr_chunk.end
        or first_chunk.size == 0
        or first_chunk.end == 0
        or second_chunk.start != first_chunk.end
    ):
        raise ValueError(repr(res))

    return second_chunk