File: base.py

package info (click to toggle)
python-clickhouse-driver 0.2.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,516 kB
  • sloc: python: 10,950; pascal: 42; makefile: 31; sh: 3
file content (87 lines) | stat: -rw-r--r-- 2,434 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from io import BytesIO

from ..reader import read_binary_uint32
from ..writer import write_binary_uint8, write_binary_uint32
from .. import errors

try:
    from clickhouse_cityhash.cityhash import CityHash128
except ImportError:
    raise RuntimeError(
        'Package clickhouse-cityhash is required to use compression'
    )


class BaseCompressor(object):
    """
    Partial file-like object with write method.
    """
    method = None
    method_byte = None

    def __init__(self):
        self.data = BytesIO()

        super(BaseCompressor, self).__init__()

    def get_value(self):
        value = self.data.getvalue()
        self.data.seek(0)
        self.data.truncate()
        return value

    def write(self, p_str):
        self.data.write(p_str)

    def compress_data(self, data):
        raise NotImplementedError

    def get_compressed_data(self, extra_header_size):
        rv = BytesIO()

        data = self.get_value()
        compressed = self.compress_data(data)

        header_size = extra_header_size + 4 + 4  # sizes

        write_binary_uint32(header_size + len(compressed), rv)
        write_binary_uint32(len(data), rv)
        rv.write(compressed)

        return rv.getvalue()


class BaseDecompressor(object):
    method = None
    method_byte = None

    def __init__(self, real_stream):
        self.stream = real_stream
        super(BaseDecompressor, self).__init__()

    def decompress_data(self, data, uncompressed_size):
        raise NotImplementedError

    def check_hash(self, compressed_data, compressed_hash):
        if CityHash128(compressed_data) != compressed_hash:
            raise errors.ChecksumDoesntMatchError()

    def get_decompressed_data(self, method_byte, compressed_hash,
                              extra_header_size):
        size_with_header = read_binary_uint32(self.stream)
        compressed_size = size_with_header - extra_header_size - 4

        compressed = BytesIO(self.stream.read(compressed_size))

        block_check = BytesIO()
        write_binary_uint8(method_byte, block_check)
        write_binary_uint32(size_with_header, block_check)
        block_check.write(compressed.getvalue())

        self.check_hash(block_check.getvalue(), compressed_hash)

        uncompressed_size = read_binary_uint32(compressed)

        compressed = compressed.read(compressed_size - 4)

        return self.decompress_data(compressed, uncompressed_size)