File: protocol.rst

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (306 lines) | stat: -rw-r--r-- 11,199 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
.. _protocol:

Protocol
========

The scheduler, workers, and clients pass messages between each other.
Semantically these messages encode commands, status updates, and data, like the
following:

*  Please compute the function ``sum`` on the data ``x`` and store in ``y``
*  The computation ``y`` has been completed
*  Be advised that a new worker named ``alice`` is available for use
*  Here is the data for the keys ``'x'``, and ``'y'``

In practice we represent these messages with dictionaries/mappings::

   {'op': 'compute',
    'function': ...
    'args': ['x']}

   {'op': 'task-complete',
    'key': 'y',
    'nbytes': 26}

   {'op': 'register-worker',
    'address': '192.168.1.42',
    'name': 'alice',
    'nthreads': 4}

   {'x': b'...',
    'y': b'...'}

When we communicate these messages between nodes we need to serialize these
messages down to a string of bytes that can then be deserialized on the other
end to their in-memory dictionary form.  For simple cases several options exist
like JSON, MsgPack, Protobuffers, and Thrift.  The situation is made more
complex by concerns like serializing Python functions and Python objects,
optional compression, cross-language support, large messages, and efficiency.

This document describes the protocol used by ``dask.distributed`` today.  Be
advised that this protocol changes rapidly as we continue to optimize for
performance.


Overview
--------

We may split a single message into multiple message-part to suit different
protocols.  Generally small bits of data are encoded with MsgPack while large
bytestrings and complex datatypes are handled by a custom format.  Each
message-part gets its own header, which is always encoded as msgpack.  After
serializing all message parts we have a sequence of bytestrings or *frames*
which we send along the wire, prepended with length information.

The application doesn't know any of this, it just sends us Python dictionaries
with various datatypes and we produce a list of bytestrings that get written to
a socket.  This format is fast both for many frequent messages and for large
messages.


MsgPack for Messages
--------------------

Most messages are encoded with MsgPack_, a self describing semi-structured
serialization format that is very similar to JSON, but smaller, faster, not
human-readable, and supporting of bytestrings and (soon) timestamps.  We chose
MsgPack as a base serialization format for the following reasons:

*  It does not require separate headers, and so is easy and flexible to use
   which is particularly important in an early stage project like
   ``dask.distributed``
*  It is very fast, much faster than JSON, and there are nicely optimized
   implementations.  With few exceptions (described later) MsgPack does not come
   anywhere near being a bottleneck, even under heavy use.
*  Unlike JSON it supports bytestrings
*  It covers the standard set of types necessary to encode most information
*  It is widely implemented in a number of languages (see cross language
   section below)

However, MsgPack fails (correctly) in the following ways:

*  It does not provide any way for us to encode Python functions or user
   defined data types
*  It does not support bytestrings greater than 4GB and is generally
   inefficient for very large messages.

Because of these failings we supplement it with a language-specific protocol
and a special case for large bytestrings.


CloudPickle for Functions and Some Data
---------------------------------------

Pickle and CloudPickle are Python libraries to serialize almost any Python
object, including functions.  We use these libraries to transform the users'
functions and data into bytes before we include them in the dictionary/map that
we pass off to msgpack.  In the introductory example you may have noticed that
we skipped providing an example for the function argument::

   {'op': 'compute',
    'function': ...
    'args': ['x']}

That is because this value ``...`` will actually be the result of calling
``cloudpickle.dumps(myfunction)``.  Those bytes will then be included in the
dictionary that we send off to msgpack, which will only have to deal with
bytes rather than obscure Python functions.

*Note: we actually call some combination of pickle and cloudpickle, depending
on the situation.  This is for performance reasons.*

CloudPickle can serialize objects by both reference (referring to them by
their module and name) or by value (serializing the actual code for the object).
By default, it serializes by reference if it can, but starting with CloudPickle 2.0
you can register a module to be serialized by value. This can be useful if you
want to send an object in a module that doesn't exist on the receiving end::

   import mymodule
   cloudpickle.register_pickle_by_value(mymodule)

Cross Language Specialization
-----------------------------

The Client and Workers must agree on a language-specific serialization format.
In the standard ``dask.distributed`` client and worker objects this ends up
being the following::

   bytes = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
   obj = cloudpickle.loads(bytes)

This varies between Python 2 and 3 and so your client and workers must match
their Python versions and software environments.

However, the Scheduler never uses the language-specific serialization and
instead only deals with MsgPack.  If the client sends a pickled function up to
the scheduler the scheduler will not unpack function but will instead keep it
as bytes.  Eventually those bytes will be sent to a worker, which will then
unpack the bytes into a proper Python function.  Because the Scheduler never
unpacks language-specific serialized bytes it may be in a different language.

**The client and workers must share the same language and software environment,
the scheduler may differ.**

This has a few advantages:

1.  The Scheduler is protected from unpickling unsafe code
2.  We could conceivably implement workers and clients for other languages
    (like R or Julia) and reuse the Python scheduler.  The worker and client
    code is fairly simple and much easier to reimplement than the scheduler,
    which is complex.
3.  The scheduler might some day be rewritten in more heavily optimized C or Go

Compression
-----------

Fast compression libraries like LZ4 or Snappy may increase effective bandwidth
by compressing data before sending and decompressing it after reception.  This
is especially valuable on lower-bandwidth networks.

If either of these libraries is available (we prefer LZ4 to Snappy) then for
every message greater than 1kB we try to compress the message and, if the
compression is at least a 10% improvement, we send the compressed bytes rather
than the original payload.  We record the compression used within the header as
a string like ``'lz4'`` or ``'snappy'``.

To avoid compressing large amounts of uncompressable data we first try to
compress a sample.  We take 10kB chunks from five locations in the dataset,
arrange them together, and try compressing the result.  If this doesn't result
in significant compression then we don't try to compress the full result.


Header
------

The header is a small dictionary encoded in msgpack that includes some metadata
about the message, such as compression.


Serializing Data
----------------

For administrative messages like updating status msgpack is sufficient.
However for large results or Python specific data, like NumPy arrays or Pandas Dataframes, or
for larger results we need to use something else to convert Python objects to
bytestrings.  Exactly how we do this is described more in the
:doc:`Serialization documentation <serialization>`.

The application code marks Python specific results with the ``to_serialize``
function:

.. code-block:: python

   >>> import numpy as np
   >>> x = np.ones(5)

   >>> from distributed.protocol import to_serialize
   >>> msg = {'status': 'OK', 'data': to_serialize(x)}
   >>> msg
   {'data': <Serialize: [ 1.  1.  1.  1.  1.]>, 'status': 'OK'}

We separate the message into two messages, one encoding all of the data to be
serialized and, and one encoding everything else::

   {'key': 'x', 'address': 'alice'}
   {'data': <Serialize: [ 1.  1.  1.  1.  1.]>}

The first message we pass normally with msgpack. The second we pass in multiple
parts, one part for each serialized piece of data (see :doc:`serialization
<serialization>`) and one header including types, compression, etc. used for each
value::

   {'keys': ['data'],
    'compression': ['lz4']}
   b'...'
   b'...'


Frames
------

At the end of the pipeline we have a sequence of bytestrings or frames.  We
need to tell the receiving end how many frames there are and how long each
these frames are.  We order the frames and lengths of frames as follows:

1.  The number of frames, stored as an 8 byte unsigned integer
2.  The length of each frame, each stored as an 8 byte unsigned integer
3.  Each of the frames

In the following sections we describe how we create these frames.


.. _MsgPack: http://msgpack.org/index.html


Technical Version
-----------------

A message is broken up into the following components:

1.  8 bytes encoding how many frames there are in the message (N) as a
    ``uint64``
2.  8 * N frames encoding the length of each frame as ``uint64`` s
3.  Header for the administrative message
4.  The administrative message, msgpack encoded, possibly compressed
5.  Header for all payload messages
6.  Payload messages

Header for Administrative Message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The administrative message is arbitrary msgpack-encoded data.  Usually a
dictionary.  It may optionally be compressed.  If so the compression type will
be in the header.

Payload frames and Header
~~~~~~~~~~~~~~~~~~~~~~~~~

These frames are optional.

Payload frames are used to send large or language-specific data.  These values
will be inserted into the administrative message after they are decoded.  The
header is msgpack encoded and contains encoding and compression information for
the all subsequent payload messages.

A Payload may be spread across many frames.  Each frame may be separately
compressed.


Simple Example
~~~~~~~~~~~~~~

This simple example shows a minimal message.  There is only an empty header and
a small msgpack message.  There are no additional payload frames

Message: ``{'status': 'OK'}``

Frames:

*  Header: ``{}``
*  Administrative Message: ``{'status': 'OK'}``


Example with Custom Data
~~~~~~~~~~~~~~~~~~~~~~~~

This example contains a single payload message composed of a single frame.  It
uses a special serialization for NumPy arrays.

Message: ``{'op': 'get-data', 'data': np.ones(5)}``

Frames:

*  Header: ``{}``
*  Administrative Message: ``{'op': 'get-data'}``
*  Payload header: ::

      {'headers': [{'type': 'numpy.ndarray',
                    'compression': 'lz4',
                    'count': 1,
                    'lengths': [40],
                    'dtype': '<f8',
                    'strides': (8,),
                    'shape': (5,)}],
                   'keys': [('data',)]}

*  Payload Frame: ``b'(\x00\x00\x00\x11\x00\x01\x00!\xf0?\x07\x00\x0f\x08\x00\x03P\x00\x00\x00\xf0?'``