1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
try:
# Check if deflate is available.
import deflate
import io
except ImportError:
print("SKIP")
raise SystemExit
try:
# Check there's enough memory to deflate gzip streams.
# zlib.compress(b'', wbits=25)
empty_gzip = (
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
deflate.DeflateIO(io.BytesIO(empty_gzip)).read()
except MemoryError:
print("SKIP")
raise SystemExit
# zlib.compress(b'micropython hello world hello world micropython', wbits=-9)
data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00'
# zlib.compress(b'micropython hello world hello world micropython', wbits=9)
data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91'
# zlib.compress(b'micropython hello world hello world micropython', wbits=25)
data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00'
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5)
data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6)
data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8)
data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00"
# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10)
data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00"
def decompress(data, *args):
buf = io.BytesIO(data)
with deflate.DeflateIO(buf, *args) as g:
return g.read()
def decompress_error(data, *args):
try:
decompress(data, *args)
except OSError:
print("OSError")
except EOFError:
print("EOFError")
except ValueError:
print("ValueError")
# Basic handling of format and detection.
print(decompress(data_raw, deflate.RAW))
print(decompress(data_zlib, deflate.ZLIB))
print(decompress(data_gzip, deflate.GZIP))
print(decompress(data_zlib)) # detect zlib/gzip.
print(decompress(data_gzip)) # detect zlib/gzip.
decompress_error(data_raw) # cannot detect zlib/gzip from raw stream
decompress_error(data_raw, deflate.ZLIB)
decompress_error(data_raw, deflate.GZIP)
decompress_error(data_zlib, deflate.RAW)
decompress_error(data_zlib, deflate.GZIP)
decompress_error(data_gzip, deflate.RAW)
decompress_error(data_gzip, deflate.ZLIB)
# Invalid data stream.
decompress_error(b"abcef", deflate.RAW)
# Invalid block type. final-block, block-type=3.
decompress_error(b"\x07", deflate.RAW)
# Truncated stream.
decompress_error(data_raw[:10], deflate.RAW)
# Partial reads.
buf = io.BytesIO(data_zlib)
with deflate.DeflateIO(buf) as g:
print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream.
print(g.read(1))
print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source
print(g.read(1))
print(buf.seek(0, 1))
print(g.read(2))
print(buf.seek(0, 1))
print(g.read())
print(buf.seek(0, 1))
print(g.read(1))
print(buf.seek(0, 1))
print(g.read())
# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are
# currently detected, see the end of uzlib_uncompress_chksum().
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00")
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00")
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB)
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP)
# Reading from a closed underlying stream.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
b.close()
try:
g.read(4)
except ValueError:
print("ValueError")
# Reading from a closed DeflateIO.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
g.close()
try:
g.read(4)
except OSError:
print("OSError")
# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled.
data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00"
print(decompress(data_gzip_header_extra))
# Test patterns.
PATTERNS_ZLIB = [
# Packed results produced by CPy's zlib.compress()
(b"0", b"x\x9c3\x00\x00\x001\x001"),
(b"a", b"x\x9cK\x04\x00\x00b\x00b"),
(b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"),
(
bytes(range(64)),
b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1",
),
(b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0
# adaptive/dynamic huffman tree
(
b"13371813150|13764518736|12345678901",
b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}<e\xb5g\x83\x0f\x89X\x07\xab",
),
# dynamic Huffman tree with "case 17" (repeat code for 3-10 times)
(
b">I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D",
b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089",
),
]
for unpacked, packed in PATTERNS_ZLIB:
print(decompress(packed) == unpacked)
print(decompress(packed, deflate.ZLIB) == unpacked)
# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream).
# Make sure there're no problem decompressing this.
data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00"
print(decompress(data_raw_with_footer, deflate.RAW))
# Valid wbits values.
decompress_error(data_wbits_5, deflate.RAW, -1)
print(len(decompress(data_wbits_5, deflate.RAW, 0)))
decompress_error(data_wbits_5, deflate.RAW, 1)
decompress_error(data_wbits_5, deflate.RAW, 4)
for i in range(5, 16):
print(len(decompress(data_wbits_5, deflate.RAW, i)))
decompress_error(data_wbits_5, deflate.RAW, 16)
# Invalid values for format.
decompress_error(data_raw, -1)
decompress_error(data_raw, 5)
# Data that requires a higher wbits value.
decompress_error(data_wbits_6, deflate.RAW, 5)
print(len(decompress(data_wbits_6, deflate.RAW, 6)))
print(len(decompress(data_wbits_6, deflate.RAW, 7)))
decompress_error(data_wbits_8, deflate.RAW, 7)
print(len(decompress(data_wbits_8, deflate.RAW, 8)))
print(len(decompress(data_wbits_8, deflate.RAW, 9)))
decompress_error(data_wbits_10, deflate.RAW)
decompress_error(data_wbits_10, deflate.RAW, 9)
print(len(decompress(data_wbits_10, deflate.RAW, 10)))
# zlib header sets the size, so works with wbits unset or wbits >= 10.
data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)"
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB)))
decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9)
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10)))
print(len(decompress(data_wbits_10_zlib)))
|