File: _compression.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (124 lines) | stat: -rw-r--r-- 4,295 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import contextlib
import zlib
from io import BytesIO
from warnings import warn

from scrapy.exceptions import ScrapyDeprecationWarning

try:
    try:
        import brotli
    except ImportError:
        import brotlicffi as brotli
except ImportError:
    pass
else:
    try:
        brotli.Decompressor.process
    except AttributeError:
        warn(
            (
                "You have brotlipy installed, and Scrapy will use it, but "
                "Scrapy support for brotlipy is deprecated and will stop "
                "working in a future version of Scrapy. brotlipy itself is "
                "deprecated, it has been superseded by brotlicffi. Please, "
                "uninstall brotlipy and install brotli or brotlicffi instead. "
                "brotlipy has the same import name as brotli, so keeping both "
                "installed is strongly discouraged."
            ),
            ScrapyDeprecationWarning,
        )

        def _brotli_decompress(decompressor, data):
            return decompressor.decompress(data)

    else:

        def _brotli_decompress(decompressor, data):
            return decompressor.process(data)


with contextlib.suppress(ImportError):
    import zstandard


_CHUNK_SIZE = 65536  # 64 KiB


class _DecompressionMaxSizeExceeded(ValueError):
    pass


def _inflate(data: bytes, *, max_size: int = 0) -> bytes:
    decompressor = zlib.decompressobj()
    raw_decompressor = zlib.decompressobj(wbits=-15)
    input_stream = BytesIO(data)
    output_stream = BytesIO()
    output_chunk = b"."
    decompressed_size = 0
    while output_chunk:
        input_chunk = input_stream.read(_CHUNK_SIZE)
        try:
            output_chunk = decompressor.decompress(input_chunk)
        except zlib.error:
            if decompressor != raw_decompressor:
                # ugly hack to work with raw deflate content that may
                # be sent by microsoft servers. For more information, see:
                # http://carsten.codimi.de/gzip.yaws/
                # http://www.port80software.com/200ok/archive/2005/10/31/868.aspx
                # http://www.gzip.org/zlib/zlib_faq.html#faq38
                decompressor = raw_decompressor
                output_chunk = decompressor.decompress(input_chunk)
            else:
                raise
        decompressed_size += len(output_chunk)
        if max_size and decompressed_size > max_size:
            raise _DecompressionMaxSizeExceeded(
                f"The number of bytes decompressed so far "
                f"({decompressed_size} B) exceed the specified maximum "
                f"({max_size} B)."
            )
        output_stream.write(output_chunk)
    output_stream.seek(0)
    return output_stream.read()


def _unbrotli(data: bytes, *, max_size: int = 0) -> bytes:
    decompressor = brotli.Decompressor()
    input_stream = BytesIO(data)
    output_stream = BytesIO()
    output_chunk = b"."
    decompressed_size = 0
    while output_chunk:
        input_chunk = input_stream.read(_CHUNK_SIZE)
        output_chunk = _brotli_decompress(decompressor, input_chunk)
        decompressed_size += len(output_chunk)
        if max_size and decompressed_size > max_size:
            raise _DecompressionMaxSizeExceeded(
                f"The number of bytes decompressed so far "
                f"({decompressed_size} B) exceed the specified maximum "
                f"({max_size} B)."
            )
        output_stream.write(output_chunk)
    output_stream.seek(0)
    return output_stream.read()


def _unzstd(data: bytes, *, max_size: int = 0) -> bytes:
    decompressor = zstandard.ZstdDecompressor()
    stream_reader = decompressor.stream_reader(BytesIO(data))
    output_stream = BytesIO()
    output_chunk = b"."
    decompressed_size = 0
    while output_chunk:
        output_chunk = stream_reader.read(_CHUNK_SIZE)
        decompressed_size += len(output_chunk)
        if max_size and decompressed_size > max_size:
            raise _DecompressionMaxSizeExceeded(
                f"The number of bytes decompressed so far "
                f"({decompressed_size} B) exceed the specified maximum "
                f"({max_size} B)."
            )
        output_stream.write(output_chunk)
    output_stream.seek(0)
    return output_stream.read()