File: codecs.py

package info (click to toggle)
python-avro 1.12.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,180 kB
  • sloc: python: 7,734; sh: 771; xml: 738; java: 386; makefile: 28
file content (201 lines) | stat: -rw-r--r-- 6,666 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3

##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Contains Codecs for Python Avro.

Note that the word "codecs" means "compression/decompression algorithms" in the
Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
so don't confuse it with the Python's "codecs", which is a package mainly for
converting charsets (https://docs.python.org/3/library/codecs.html).
"""

import abc
import binascii
import io
import struct
import zlib
from typing import Dict, Tuple, Type

import avro.errors
import avro.io

#
# Constants
#
STRUCT_CRC32 = struct.Struct(">I")  # big-endian unsigned int


def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
    if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]:
        raise avro.errors.AvroException("Checksum failure")


try:
    import bz2

    has_bzip2 = True
except ImportError:
    has_bzip2 = False
try:
    import snappy

    has_snappy = True
except ImportError:
    has_snappy = False
try:
    import zstandard as zstd

    has_zstandard = True
except ImportError:
    has_zstandard = False


class Codec(abc.ABC):
    """Abstract base class for all Avro codec classes."""

    @staticmethod
    @abc.abstractmethod
    def compress(data: bytes) -> Tuple[bytes, int]:
        """Compress the passed data.

        :param data: a byte string to be compressed
        :type data: str

        :rtype: tuple
        :return: compressed data and its length
        """

    @staticmethod
    @abc.abstractmethod
    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
        """Read compressed data via the passed BinaryDecoder and decompress it.

        :param readers_decoder: a BinaryDecoder object currently being used for
                                reading an object container file
        :type readers_decoder: avro.io.BinaryDecoder

        :rtype: avro.io.BinaryDecoder
        :return: a newly instantiated BinaryDecoder object that contains the
                 decompressed data which is wrapped by a StringIO
        """


class NullCodec(Codec):
    @staticmethod
    def compress(data: bytes) -> Tuple[bytes, int]:
        return data, len(data)

    @staticmethod
    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
        readers_decoder.skip_long()
        return readers_decoder


class DeflateCodec(Codec):
    @staticmethod
    def compress(data: bytes) -> Tuple[bytes, int]:
        # The first two characters and last character are zlib
        # wrappers around deflate data.
        compressed_data = zlib.compress(data)[2:-1]
        return compressed_data, len(compressed_data)

    @staticmethod
    def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
        # Compressed data is stored as (length, data), which
        # corresponds to how the "bytes" type is encoded.
        data = readers_decoder.read_bytes()
        # -15 is the log of the window size; negative indicates
        # "raw" (no zlib headers) decompression.  See zlib.h.
        uncompressed = zlib.decompress(data, -15)
        return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


if has_bzip2:

    class BZip2Codec(Codec):
        @staticmethod
        def compress(data: bytes) -> Tuple[bytes, int]:
            compressed_data = bz2.compress(data)
            return compressed_data, len(compressed_data)

        @staticmethod
        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
            length = readers_decoder.read_long()
            data = readers_decoder.read(length)
            uncompressed = bz2.decompress(data)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


if has_snappy:

    class SnappyCodec(Codec):
        @staticmethod
        def compress(data: bytes) -> Tuple[bytes, int]:
            compressed_data = snappy.compress(data)
            # A 4-byte, big-endian CRC32 checksum
            compressed_data += STRUCT_CRC32.pack(binascii.crc32(data) & 0xFFFFFFFF)
            return compressed_data, len(compressed_data)

        @staticmethod
        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
            # Compressed data includes a 4-byte CRC32 checksum
            length = readers_decoder.read_long()
            data = readers_decoder.read(length - 4)
            uncompressed = snappy.decompress(data)
            checksum = readers_decoder.read(4)
            _check_crc32(uncompressed, checksum)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


if has_zstandard:

    class ZstandardCodec(Codec):
        @staticmethod
        def compress(data: bytes) -> Tuple[bytes, int]:
            compressed_data = zstd.ZstdCompressor().compress(data)
            return compressed_data, len(compressed_data)

        @staticmethod
        def decompress(readers_decoder: avro.io.BinaryDecoder) -> avro.io.BinaryDecoder:
            length = readers_decoder.read_long()
            data = readers_decoder.read(length)
            uncompressed = bytearray()
            dctx = zstd.ZstdDecompressor()
            with dctx.stream_reader(io.BytesIO(data)) as reader:
                while True:
                    chunk = reader.read(16384)
                    if not chunk:
                        break
                    uncompressed.extend(chunk)
            return avro.io.BinaryDecoder(io.BytesIO(uncompressed))


KNOWN_CODECS: Dict[str, Type[Codec]] = {
    name[:-5].lower(): class_
    for name, class_ in globals().items()
    if class_ != Codec and name.endswith("Codec") and isinstance(class_, type) and issubclass(class_, Codec)
}


def get_codec(codec_name: str) -> Type[Codec]:
    try:
        return KNOWN_CODECS[codec_name]
    except KeyError:
        raise avro.errors.UnsupportedCodec(f"Unsupported codec: {codec_name}. (Is it installed?)")