1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------
from collections import OrderedDict
import gzip
import io
import pickle
import msgpack
from ..compat import pack_kwargs
class NoneCompressor(object):
name = 'none'
def compress(self, data):
return data
def decompress(self, data):
return data
class GzipCompressor(object):
name = 'gzip'
def compress(self, data):
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode='wb', compresslevel=1) as f:
f.write(data)
return buf.getvalue()
def decompress(self, data):
with gzip.GzipFile(fileobj=io.BytesIO(data)) as f:
return f.read()
try:
import msgpack_numpy
except ImportError:
msgpack_numpy = None
def check_pyarrow():
try:
import pyarrow
except ImportError:
raise ImportError("Serializing DataFrames requires pyarrow.")
return pyarrow
class MsgPackSerializer(object):
# TODO: This is ugly, should maybe transition to
# distributed.protocol.serialize
name = 'msgpack'
def encode(self, obj, container):
from ..compat import np_pack_kwargs
if container in ['ndarray', 'xarray'] and msgpack_numpy:
return msgpack.packb(obj, **np_pack_kwargs)
elif container == 'dataframe':
# Use pyarrow for serializing DataFrames, rather than
# msgpack: https://github.com/intake/intake/issues/460
pa = check_pyarrow()
context = pa.default_serialization_context()
# This eventually goes to msgpack.packb, which doesn't
# directly accept PyArrow Buffer objects. Need to wrap
# it in a memoryview to avoid a TypeError.
return memoryview(context.serialize(obj).to_buffer())
else:
return msgpack.packb(obj, **pack_kwargs)
def decode(self, bytestr, container):
from ..compat import unpack_kwargs
if container in ['ndarray', 'xarray'] and msgpack_numpy:
from ..compat import np_unpack_kwargs
return msgpack.unpackb(bytestr, **np_unpack_kwargs)
elif container == 'dataframe':
pa = check_pyarrow()
context = pa.default_serialization_context()
return context.deserialize(bytestr)
else:
return msgpack.unpackb(bytestr, **unpack_kwargs)
class PickleSerializer(object):
def __init__(self, protocol_level):
self._protocol_level = protocol_level
self.name = 'pickle%d' % protocol_level
def encode(self, obj, container):
return pickle.dumps(obj, protocol=self._protocol_level)
def decode(self, bytestr, container):
return pickle.loads(bytestr)
class ComboSerializer(object):
def __init__(self, format_encoder, compressor):
self._format_encoder = format_encoder
self._compressor = compressor
self.format_name = format_encoder.name
self.compressor_name = compressor.name
def encode(self, obj, container):
return self._compressor.compress(
self._format_encoder.encode(obj, container))
def decode(self, bytestr, container):
return self._format_encoder.decode(
self._compressor.decompress(bytestr), container)
compressors = [GzipCompressor(), NoneCompressor()]
try:
import snappy
class SnappyCompressor(object):
name = 'snappy'
def compress(self, data):
return snappy.compress(data)
def decompress(self, data):
return snappy.decompress(data)
compressors.insert(0, SnappyCompressor())
except ImportError:
pass
# Insert in preference order
picklers = [PickleSerializer(protocol) for protocol in [2, 1]]
serializers = [MsgPackSerializer()] + picklers
format_registry = OrderedDict([(e.name, e) for e in serializers])
compression_registry = OrderedDict([(e.name, e) for e in compressors])
|