File: codecs.py

package info (click to toggle)
python-avro 1.10.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,624 kB
  • sloc: python: 11,437; xml: 4,061; sh: 752; java: 386; makefile: 21
file content (204 lines) | stat: -rw-r--r-- 6,671 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python

##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Contains Codecs for Python Avro.

Note that the word "codecs" means "compression/decompression algorithms" in the
Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
so don't confuse it with the Python's "codecs", which is a package mainly for
converting charsets (https://docs.python.org/3/library/codecs.html).
"""

from __future__ import absolute_import, division, print_function

import io
import struct
import sys
import zlib
from abc import ABCMeta, abstractmethod
from binascii import crc32
from struct import Struct

import avro.io
from avro.schema import AvroException

#
# Constants
#
STRUCT_CRC32 = Struct('>I')  # big-endian unsigned int


try:
    import bz2
    has_bzip2 = True
except ImportError:
    has_bzip2 = False
try:
    import snappy
    has_snappy = True
except ImportError:
    has_snappy = False
try:
    import zstandard as zstd
    has_zstandard = True
except ImportError:
    has_zstandard = False


class Codec:
    """Abstract base class for all Avro codec classes."""
    __metaclass__ = ABCMeta

    @abstractmethod
    def compress(self, data):
        """Compress the passed data.

        :param data: a byte string to be compressed
        :type data: str

        :rtype: tuple
        :return: compressed data and its length
        """
        pass

    @abstractmethod
    def decompress(self, readers_decoder):
        """Read compressed data via the passed BinaryDecoder and decompress it.

        :param readers_decoder: a BinaryDecoder object currently being used for
                                reading an object container file
        :type readers_decoder: avro.io.BinaryDecoder

        :rtype: avro.io.BinaryDecoder
        :return: a newly instantiated BinaryDecoder object that contains the
                 decompressed data which is wrapped by a StringIO
        """
        pass


class NullCodec(Codec):
    def compress(self, data):
        return data, len(data)

    def decompress(self, readers_decoder):
        readers_decoder.skip_long()
        return readers_decoder


class DeflateCodec(Codec):
    def compress(self, data):
        # The first two characters and last character are zlib
        # wrappers around deflate data.
        compressed_data = zlib.compress(data)[2:-1]
        return compressed_data, len(compressed_data)

    def decompress(self, readers_decoder):
        # Compressed data is stored as (length, data), which
        # corresponds to how the "bytes" type is encoded.
        data = readers_decoder.read_bytes()
        # -15 is the log of the window size; negative indicates
        # "raw" (no zlib headers) decompression.  See zlib.h.
        uncompressed = zlib.decompress(data, -15)
        return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


if has_bzip2:
    class BZip2Codec(Codec):
        def compress(self, data):
            compressed_data = bz2.compress(data)
            return compressed_data, len(compressed_data)

        def decompress(self, readers_decoder):
            length = readers_decoder.read_long()
            data = readers_decoder.read(length)
            uncompressed = bz2.decompress(data)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


if has_snappy:
    class SnappyCodec(Codec):
        def compress(self, data):
            compressed_data = snappy.compress(data)
            # A 4-byte, big-endian CRC32 checksum
            compressed_data += STRUCT_CRC32.pack(crc32(data) & 0xffffffff)
            return compressed_data, len(compressed_data)

        def decompress(self, readers_decoder):
            # Compressed data includes a 4-byte CRC32 checksum
            length = readers_decoder.read_long()
            data = readers_decoder.read(length - 4)
            uncompressed = snappy.decompress(data)
            checksum = readers_decoder.read(4)
            self.check_crc32(uncompressed, checksum)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))

        def check_crc32(self, bytes, checksum):
            checksum = STRUCT_CRC32.unpack(checksum)[0]
            if crc32(bytes) & 0xffffffff != checksum:
                raise schema.AvroException("Checksum failure")


if has_zstandard:
    class ZstandardCodec(Codec):
        def compress(self, data):
            compressed_data = zstd.ZstdCompressor().compress(data)
            return compressed_data, len(compressed_data)

        def decompress(self, readers_decoder):
            length = readers_decoder.read_long()
            data = readers_decoder.read(length)
            uncompressed = bytearray()
            dctx = zstd.ZstdDecompressor()
            with dctx.stream_reader(io.BytesIO(data)) as reader:
                while True:
                    chunk = reader.read(16384)
                    if not chunk:
                        break
                    uncompressed.extend(chunk)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


class Codecs(object):
    @staticmethod
    def get_codec(codec_name):
        codec_name = codec_name.lower()
        if codec_name == "null":
            return NullCodec()
        elif codec_name == "deflate":
            return DeflateCodec()
        elif codec_name == "bzip2" and has_bzip2:
            return BZip2Codec()
        elif codec_name == "snappy" and has_snappy:
            return SnappyCodec()
        elif codec_name == "zstandard" and has_zstandard:
            return ZstandardCodec()
        else:
            raise ValueError("Unsupported codec: %r" % codec_name)

    @staticmethod
    def supported_codec_names():
        codec_names = ['null', 'deflate']
        if has_bzip2:
            codec_names.append('bzip2')
        if has_snappy:
            codec_names.append('snappy')
        if has_zstandard:
            codec_names.append('zstandard')
        return codec_names