File: serializer.py

package info (click to toggle)
intake 0.6.6-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 6,552 kB
  • sloc: python: 12,408; makefile: 37; sh: 14
file content (143 lines) | stat: -rw-r--r-- 4,208 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------

from collections import OrderedDict
import gzip
import io
import pickle

import msgpack

from ..compat import pack_kwargs

class NoneCompressor(object):
    name = 'none'

    def compress(self, data):
        return data

    def decompress(self, data):
        return data


class GzipCompressor(object):
    name = 'gzip'

    def compress(self, data):
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode='wb', compresslevel=1) as f:
            f.write(data)
        return buf.getvalue()

    def decompress(self, data):
        with gzip.GzipFile(fileobj=io.BytesIO(data)) as f:
            return f.read()


try:
    import msgpack_numpy
except ImportError:
    msgpack_numpy = None


def check_pyarrow():
    try:
        import pyarrow
    except ImportError:
        raise ImportError("Serializing DataFrames requires pyarrow.")
    return pyarrow



class MsgPackSerializer(object):
    # TODO: This is ugly, should maybe transition to
    #  distributed.protocol.serialize
    name = 'msgpack'

    def encode(self, obj, container):
        from ..compat import np_pack_kwargs
        if container in ['ndarray', 'xarray'] and msgpack_numpy:
            return msgpack.packb(obj, **np_pack_kwargs)
        elif container == 'dataframe':
            # Use pyarrow for serializing DataFrames, rather than
            # msgpack: https://github.com/intake/intake/issues/460
            pa = check_pyarrow()

            context = pa.default_serialization_context()
            # This eventually goes to msgpack.packb, which doesn't
            # directly accept PyArrow Buffer objects. Need to wrap
            # it in a memoryview to avoid a TypeError.
            return memoryview(context.serialize(obj).to_buffer())
        else:
            return msgpack.packb(obj, **pack_kwargs)

    def decode(self, bytestr, container):
        from ..compat import unpack_kwargs
        if container in ['ndarray', 'xarray'] and msgpack_numpy:
            from ..compat import np_unpack_kwargs
            return msgpack.unpackb(bytestr, **np_unpack_kwargs)
        elif container == 'dataframe':
            pa = check_pyarrow()
            context = pa.default_serialization_context()
            return context.deserialize(bytestr)
        else:
            return msgpack.unpackb(bytestr, **unpack_kwargs)


class PickleSerializer(object):
    def __init__(self, protocol_level):
        self._protocol_level = protocol_level
        self.name = 'pickle%d' % protocol_level

    def encode(self, obj, container):
        return pickle.dumps(obj, protocol=self._protocol_level)

    def decode(self, bytestr, container):
        return pickle.loads(bytestr)


class ComboSerializer(object):
    def __init__(self, format_encoder, compressor):
        self._format_encoder = format_encoder
        self._compressor = compressor
        self.format_name = format_encoder.name
        self.compressor_name = compressor.name

    def encode(self, obj, container):
        return self._compressor.compress(
            self._format_encoder.encode(obj, container))

    def decode(self, bytestr, container):
        return self._format_encoder.decode(
            self._compressor.decompress(bytestr), container)


compressors = [GzipCompressor(), NoneCompressor()]
try:
    import snappy


    class SnappyCompressor(object):
        name = 'snappy'

        def compress(self, data):
            return snappy.compress(data)

        def decompress(self, data):
            return snappy.decompress(data)


    compressors.insert(0, SnappyCompressor())
except ImportError:
    pass


# Insert in preference order
picklers = [PickleSerializer(protocol) for protocol in [2, 1]]
serializers = [MsgPackSerializer()] + picklers
format_registry = OrderedDict([(e.name, e) for e in serializers])
compression_registry = OrderedDict([(e.name, e) for e in compressors])