File: gz.py

package info (click to toggle)
python-scrapy 1.5.1-1%2Bdeb10u1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 4,404 kB
  • sloc: python: 25,793; xml: 199; makefile: 95; sh: 33
file content (65 lines) | stat: -rw-r--r-- 2,139 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import struct

try:
    from cStringIO import StringIO as BytesIO
except ImportError:
    from io import BytesIO
from gzip import GzipFile

import six
import re

# - Python>=3.5 GzipFile's read() has issues returning leftover
#   uncompressed data when input is corrupted
#   (regression or bug-fix compared to Python 3.4)
# - read1(), which fetches data before raising EOFError on next call
#   works here but is only available from Python>=3.3
# - scrapy does not support Python 3.2
# - Python 2.7 GzipFile works fine with standard read() + extrabuf
if six.PY2:
    def read1(gzf, size=-1):
        return gzf.read(size)
else:
    def read1(gzf, size=-1):
        return gzf.read1(size)


def gunzip(data):
    """Gunzip the given data and return as much data as possible.

    This is resilient to CRC checksum errors.
    """
    f = GzipFile(fileobj=BytesIO(data))
    output_list = []
    chunk = b'.'
    while chunk:
        try:
            chunk = read1(f, 8196)
            output_list.append(chunk)
        except (IOError, EOFError, struct.error):
            # complete only if there is some data, otherwise re-raise
            # see issue 87 about catching struct.error
            # some pages are quite small so output_list is empty and f.extrabuf
            # contains the whole page content
            if output_list or getattr(f, 'extrabuf', None):
                try:
                    output_list.append(f.extrabuf[-f.extrasize:])
                finally:
                    break
            else:
                raise
    return b''.join(output_list)

_is_gzipped = re.compile(br'^application/(x-)?gzip\b', re.I).search
_is_octetstream = re.compile(br'^(application|binary)/octet-stream\b', re.I).search

def is_gzipped(response):
    """Return True if the response is gzipped, or False otherwise"""
    ctype = response.headers.get('Content-Type', b'')
    cenc = response.headers.get('Content-Encoding', b'').lower()
    return (_is_gzipped(ctype) or
            (_is_octetstream(ctype) and cenc in (b'gzip', b'x-gzip')))


def gzip_magic_number(response):
    return response.body[:3] == b'\x1f\x8b\x08'