File: gunzip.py

package info (click to toggle)
python-bitarray 3.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,288 kB
  • sloc: python: 11,456; ansic: 7,657; makefile: 73; sh: 6
file content (159 lines) | stat: -rw-r--r-- 4,709 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import datetime
import zlib
from pprint import pprint

from bitarray import bitarray

from puff import Puff


class GunZip(Puff):

    operating_system = {
        0: "FAT",      1: "Amiga",          2: "VMS",      3: "Unix",
        4: "VM/CMS",   5: "Atari TOS",      6: "HPFS",     7: "Macintosh",
        8: "Z-System", 9: "CP/M",          10: "TOPS-20", 11: "NTFS",
        12: "QDOS",   13: "Acorn RISCOS", 255: "Unknown",
    }

    def read_nul_terminated_string(self) -> str:
        a = bytearray()
        while True:
            b: int = self.read_uint(8)
            if b == 0:
                return a.decode("UTF-8")
            a.append(b)

    def read_header(self, verbose=False) -> None:

        def vprint(txt):
            if verbose:
                print(txt)

        if self.read_uint(16) != 0x8b1f:
            raise ValueError("Invalid GZIP magic number")

        cmeth = self.read_uint(8)
        if cmeth != 8:
            raise ValueError(f"Unsupported compression method: {str(cmeth)}")

        # reserved flags
        flags: int = self.read_uint(8)
        if flags & 0xe0 != 0:
            vprint("Reserved flags are set")

        # modification time
        mtime = self.read_uint(32)
        if mtime != 0:
            dt = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc)
            vprint(f"Last modified: {dt}")
        else:
            vprint("Last modified: N/A")

        # extra flags
        extraflags = self.read_uint(8)
        if extraflags == 2:
            vprint("Extra flags: Maximum compression")
        elif extraflags == 4:
            vprint("Extra flags: Fastest compression")
        else:
            vprint(f"Extra flags: Unknown ({extraflags})")

        osbyte = self.read_uint(8)
        osstr: str = self.operating_system.get(osbyte, "Really unknown")
        vprint(f"Operating system: {osstr}")

        # handle assorted flags
        if flags & 0x01:
            vprint("Flag: Text")
        if flags & 0x04:
            vprint("Flag: Extra")
            count: int = self.read_uint(16)
            while count > 0:  # Skip extra data
                self.read_uint(8)
                count -= 1
        if flags & 0x08:
            vprint(f"File name: {self.read_nul_terminated_string()}")
        if flags & 0x02:
            vprint(f"Header CRC-16: {self.read_uint(16):04X}")
        if flags & 0x10:
            vprint(f"Comment: {self.read_nul_terminated_string()}")

    def check_footer(self, decomp):
        self.align_byte_boundary()

        crc = self.read_uint(32)
        size = self.read_uint(32)

        # check decompressed data's length and CRC
        if size != len(decomp):
            raise ValueError(f"Size mismatch: expected={size}, "
                             f"actual={len(decomp)}")

        actualcrc = zlib.crc32(decomp) & 0xffffffff
        if crc != actualcrc:
            raise ValueError(f"CRC-32 mismatch: expected={crc:08X}, "
                             f"actual={actualcrc:08X}")

def print_dot(*args):
    sys.stdout.write('.')
    sys.stdout.flush()

def decompress_file(infile, outfile, opts):
    # read input file and store content in little endian bitarray
    input_bits = bitarray(0, 'little')
    with open(infile, "rb") as fi:
        input_bits.fromfile(fi)

    # gunzip: the output is accumulated in a bytearray
    output = bytearray()
    d = GunZip(input_bits, output)
    d.read_header(verbose=opts.verbose)
    stats = d.process_blocks(print_dot if opts.progress else None)
    d.check_footer(output)

    if opts.progress:
        sys.stdout.write('\n')
    if opts.stats:
        pprint(stats)

    # write output to file
    with open(outfile, "wb") as fo:
        fo.write(output)


def main():
    from argparse import ArgumentParser

    p = ArgumentParser()

    p.add_argument('-p', '--progress', action="store_true",
                   help="show progress while decoding")

    p.add_argument('-s', '--stats', action="store_true",
                   help="show block statistics")

    p.add_argument('-v', '--verbose', action="store_true")

    p.add_argument('-o', '--out', action="store", dest='dst',
                   help='output filename')

    p.add_argument(dest='src', metavar='SRC')

    args = p.parse_args()

    if args.dst is None:
        if args.src.endswith('.gz'):
            args.dst = args.src[:-3]
        elif args.src.endswith('.tgz'):
            args.dst = '%s.tar' % args.src[:-4]
        else:
            p.error('cannot guess uncompressed filename from %r, '
                    'please provide -o/-out option' % args.src)

    decompress_file(args.src, args.dst, args)


if __name__ == "__main__":
    main()