1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
from io import BytesIO
try:
from clickhouse_cityhash.cityhash import CityHash128
except ImportError:
raise RuntimeError(
'Package clickhouse-cityhash is required to use compression'
)
from .native import BlockOutputStream, BlockInputStream
from ..bufferedreader import CompressedBufferedReader
from ..bufferedwriter import CompressedBufferedWriter
from ..compression import get_decompressor_cls
from ..defines import BUFFER_SIZE
from ..reader import read_binary_uint8, read_binary_uint128
from ..writer import write_binary_uint8, write_binary_uint128
class CompressedBlockOutputStream(BlockOutputStream):
def __init__(self, compressor_cls, compress_block_size, fout, context):
self.compressor_cls = compressor_cls
self.compress_block_size = compress_block_size
self.raw_fout = fout
self.compressor = self.compressor_cls()
self.fout = CompressedBufferedWriter(self.compressor, BUFFER_SIZE)
super(CompressedBlockOutputStream, self).__init__(self.fout, context)
def get_compressed_hash(self, data):
return CityHash128(data)
def finalize(self):
self.fout.flush()
compressed = self.get_compressed()
compressed_size = len(compressed)
compressed_hash = self.get_compressed_hash(compressed)
write_binary_uint128(compressed_hash, self.raw_fout)
block_size = self.compress_block_size
i = 0
while i < compressed_size:
self.raw_fout.write(compressed[i:i + block_size])
i += block_size
self.raw_fout.flush()
def get_compressed(self):
compressed = BytesIO()
if self.compressor.method_byte is not None:
write_binary_uint8(self.compressor.method_byte, compressed)
extra_header_size = 1 # method
else:
extra_header_size = 0
data = self.compressor.get_compressed_data(extra_header_size)
compressed.write(data)
return compressed.getvalue()
class CompressedBlockInputStream(BlockInputStream):
def __init__(self, fin, context):
self.raw_fin = fin
fin = CompressedBufferedReader(self.read_block, BUFFER_SIZE)
super(CompressedBlockInputStream, self).__init__(fin, context)
def get_compressed_hash(self, data):
return CityHash128(data)
def read_block(self):
compressed_hash = read_binary_uint128(self.raw_fin)
method_byte = read_binary_uint8(self.raw_fin)
decompressor_cls = get_decompressor_cls(method_byte)
decompressor = decompressor_cls(self.raw_fin)
if decompressor.method_byte is not None:
extra_header_size = 1 # method
else:
extra_header_size = 0
return decompressor.get_decompressed_data(
method_byte, compressed_hash, extra_header_size
)
|