1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
import contextlib
import zlib
from io import BytesIO
from warnings import warn
from scrapy.exceptions import ScrapyDeprecationWarning
try:
try:
import brotli
except ImportError:
import brotlicffi as brotli
except ImportError:
pass
else:
try:
brotli.Decompressor.process
except AttributeError:
warn(
(
"You have brotlipy installed, and Scrapy will use it, but "
"Scrapy support for brotlipy is deprecated and will stop "
"working in a future version of Scrapy. brotlipy itself is "
"deprecated, it has been superseded by brotlicffi. Please, "
"uninstall brotlipy and install brotli or brotlicffi instead. "
"brotlipy has the same import name as brotli, so keeping both "
"installed is strongly discouraged."
),
ScrapyDeprecationWarning,
)
def _brotli_decompress(decompressor, data):
return decompressor.decompress(data)
else:
def _brotli_decompress(decompressor, data):
return decompressor.process(data)
with contextlib.suppress(ImportError):
import zstandard
_CHUNK_SIZE = 65536 # 64 KiB
class _DecompressionMaxSizeExceeded(ValueError):
pass
def _inflate(data: bytes, *, max_size: int = 0) -> bytes:
decompressor = zlib.decompressobj()
raw_decompressor = zlib.decompressobj(wbits=-15)
input_stream = BytesIO(data)
output_stream = BytesIO()
output_chunk = b"."
decompressed_size = 0
while output_chunk:
input_chunk = input_stream.read(_CHUNK_SIZE)
try:
output_chunk = decompressor.decompress(input_chunk)
except zlib.error:
if decompressor != raw_decompressor:
# ugly hack to work with raw deflate content that may
# be sent by microsoft servers. For more information, see:
# http://carsten.codimi.de/gzip.yaws/
# http://www.port80software.com/200ok/archive/2005/10/31/868.aspx
# http://www.gzip.org/zlib/zlib_faq.html#faq38
decompressor = raw_decompressor
output_chunk = decompressor.decompress(input_chunk)
else:
raise
decompressed_size += len(output_chunk)
if max_size and decompressed_size > max_size:
raise _DecompressionMaxSizeExceeded(
f"The number of bytes decompressed so far "
f"({decompressed_size} B) exceed the specified maximum "
f"({max_size} B)."
)
output_stream.write(output_chunk)
output_stream.seek(0)
return output_stream.read()
def _unbrotli(data: bytes, *, max_size: int = 0) -> bytes:
decompressor = brotli.Decompressor()
input_stream = BytesIO(data)
output_stream = BytesIO()
output_chunk = b"."
decompressed_size = 0
while output_chunk:
input_chunk = input_stream.read(_CHUNK_SIZE)
output_chunk = _brotli_decompress(decompressor, input_chunk)
decompressed_size += len(output_chunk)
if max_size and decompressed_size > max_size:
raise _DecompressionMaxSizeExceeded(
f"The number of bytes decompressed so far "
f"({decompressed_size} B) exceed the specified maximum "
f"({max_size} B)."
)
output_stream.write(output_chunk)
output_stream.seek(0)
return output_stream.read()
def _unzstd(data: bytes, *, max_size: int = 0) -> bytes:
decompressor = zstandard.ZstdDecompressor()
stream_reader = decompressor.stream_reader(BytesIO(data))
output_stream = BytesIO()
output_chunk = b"."
decompressed_size = 0
while output_chunk:
output_chunk = stream_reader.read(_CHUNK_SIZE)
decompressed_size += len(output_chunk)
if max_size and decompressed_size > max_size:
raise _DecompressionMaxSizeExceeded(
f"The number of bytes decompressed so far "
f"({decompressed_size} B) exceed the specified maximum "
f"({max_size} B)."
)
output_stream.write(output_chunk)
output_stream.seek(0)
return output_stream.read()
|