File: compression.py

package info (click to toggle)
python-fastparquet 2024.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 120,180 kB
  • sloc: python: 8,181; makefile: 187
file content (114 lines) | stat: -rw-r--r-- 3,691 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

import cramjam
import numpy as np
from fastparquet import parquet_thrift

# TODO: use stream/direct-to-buffer conversions instead of memcopy

compressions = {
    'UNCOMPRESSED': lambda x: x
}
decompressions = {
    'UNCOMPRESSED': lambda x, y: x
}

# Gzip is present regardless
COMPRESSION_LEVEL = 6


def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
    return cramjam.gzip.compress(data, level=compresslevel)


def gzip_decompress(data, uncompressed_size):
    return cramjam.gzip.decompress(data, output_len=uncompressed_size)


compressions['GZIP'] = gzip_compress_v3
decompressions['GZIP'] = gzip_decompress
compressions['SNAPPY'] = cramjam.snappy.compress_raw
decompressions['SNAPPY'] = cramjam.snappy.decompress_raw

try:
    import lzo
    def lzo_decompress(data, uncompressed_size):
        return lzo.decompress(data)
    compressions['LZO'] = lzo.compress
    decompressions['LZO'] = lzo_decompress
except ImportError:
    pass
compressions['BROTLI'] = cramjam.brotli.compress
decompressions['BROTLI'] = cramjam.brotli.decompress


def lz4_compress(data, **kwargs):
    kwargs['store_size'] = False
    return cramjam.lz4.compress_block(data, **kwargs)


def lz4_decomp(data, size):
    return cramjam.lz4.decompress_block(np.frombuffer(data, 'uint8'), size)


compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = lz4_decomp

# LZ4 is actually LZ4 block, aka "raw", see
# https://github.com/apache/parquet-format/commit/7f06e838cbd1b7dbd722ff2580b9c2525e37fc46
compressions['LZ4_RAW'] = lz4_compress
decompressions['LZ4_RAW'] = lz4_decomp
compressions['ZSTD'] = cramjam.zstd.compress
decompressions['ZSTD'] = cramjam.zstd.decompress
decom_into = {
    "GZIP": cramjam.gzip.decompress_into,
    "SNAPPY": cramjam.snappy.decompress_raw_into,
    "ZSTD": cramjam.zstd.decompress_into,
    "BROTLI": cramjam.brotli.decompress_into
}

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}

rev_map = {getattr(parquet_thrift.CompressionCodec, key): key for key in
           dir(parquet_thrift.CompressionCodec) if key in
           ['UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD', 'LZ4_RAW']}


def compress_data(data, compression='gzip'):
    if isinstance(compression, dict):
        algorithm = compression.get('type', 'gzip')
        if isinstance(algorithm, int):
            algorithm = rev_map[compression]
        args = compression.get('args', None)
    else:
        algorithm = compression
        args = None

    if isinstance(algorithm, int):
        algorithm = rev_map[compression]

    if algorithm.upper() not in compressions:
        raise RuntimeError("Compression '%s' not available.  Options: %s" %
                (algorithm, sorted(compressions)))
    if args is None:
        return compressions[algorithm.upper()](data)
    else:
        if not isinstance(args, dict):
            raise ValueError("args dict entry is not a dict")
        return compressions[algorithm.upper()](data, **args)


def decompress_data(data, uncompressed_size, algorithm='gzip'):
    if isinstance(algorithm, int):
        algorithm = rev_map[algorithm]
    if algorithm.upper() not in decompressions:
        raise RuntimeError(
            "Decompression '%s' not available.  Options: %s" %
            (algorithm.upper(), sorted(decompressions))
        )
    if algorithm.upper() in decom_into:
        # ensures writable buffer from cramjam
        x = np.empty(uncompressed_size, dtype='uint8')
        decom_into[algorithm.upper()](np.frombuffer(data, dtype=np.uint8), x)
        return x
    return decompressions[algorithm.upper()](data, uncompressed_size)