File: journey.rst

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (187 lines) | stat: -rw-r--r-- 7,293 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
Journey of a Task
=================

We follow a single task through the user interface, scheduler, worker nodes,
and back.  Hopefully this helps to illustrate the inner workings of the system.

User code
---------

A user computes the addition of two variables already on the cluster, then pulls the result back to the local process.

.. code-block:: python

   client = Client('host:port')
   x = client.submit(...)
   y = client.submit(...)

   z = client.submit(add, x, y)  # we follow z

   print(z.result())


Step 1: Client
----------------

``z`` begins its life when the ``Client.submit`` function sends the following
message to the ``Scheduler``::

    {'op': 'update-graph',
     'tasks': {'z': (add, x, y)},
     'keys': ['z']}

The client then creates a ``Future`` object with the key ``'z'`` and returns
that object back to the user.  This happens even before the message has been
received by the scheduler.  The status of the future says ``'pending'``.


Step 2: Arrive in the Scheduler
-------------------------------

A few milliseconds later, the scheduler receives this message on an open socket.

The scheduler updates its state with this little graph that shows how to compute
``z``::

    scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])

The scheduler also updates *a lot* of other state.  Notably, it has to identify
that ``x`` and ``y`` are themselves variables, and connect all of those
dependencies.  This is a long and detail oriented process that involves
updating roughly 10 sets and dictionaries.  Interested readers should
investigate ``distributed/scheduler.py::update_graph()``.  While this is fairly
complex and tedious to describe rest assured that it all happens in constant
time and in about a millisecond.


Step 3: Select a Worker
-----------------------

Once the latter of ``x`` and ``y`` finishes, the scheduler notices that all of
``z``'s dependencies are in memory and that ``z`` itself may now run.  Which worker
should ``z`` select?  We consider a sequence of criteria:

1.  First, we quickly downselect to only those workers that have either ``x``
    or ``y`` in local memory.
2.  Then, we select the worker that would have to gather the least number of
    bytes in order to get both ``x`` and ``y`` locally.  E.g. if two different
    workers have ``x`` and ``y`` and if ``y`` takes up more bytes than ``x``
    then we select the machine that holds ``y`` so that we don't have to
    communicate as much.
3.  If there are multiple workers that require the minimum number of
    communication bytes then we select the worker that is the least busy

``z`` considers the workers and chooses one based on the above criteria.  In the
common case the choice is pretty obvious after step 1.  ``z`` waits on a stack
associated with the chosen worker.  The worker may still be busy though, so ``z``
may wait a while.

*Note: This policy is under flux and this part of this document is quite
possibly out of date.*

Step 4: Transmit to the Worker
------------------------------

Eventually the worker finishes a task, has a spare core, and ``z`` finds itself at
the top of the stack (note, that this may be some time after the last section
if other tasks placed themselves on top of the worker's stack in the meantime.)

We place ``z`` into a ``worker_queue`` associated with that worker and a
``worker_core`` coroutine pulls it out.  ``z``'s function, the keys associated
to its arguments, and the locations of workers that hold those keys are packed
up into a message that looks like this::

    {'op': 'compute',
     'function': execute_task,
     'args': ((add, 'x', 'y'),),
     'who_has': {'x': {(worker_host, port)},
                 'y': {(worker_host, port), (worker_host, port)}},
     'key': 'z'}

This message is serialized and sent across a TCP socket to the worker.


Step 5: Execute on the Worker
-----------------------------

The worker unpacks the message, and notices that it needs to have both ``x``
and ``y``.  If the worker does not already have both of these then it gathers
them from the workers listed in the ``who_has`` dictionary also in the message.
For each key that it doesn't have it selects a valid worker from ``who_has`` at
random and gathers data from it.

After this exchange, the worker has both the value for ``x`` and the value for
``y``.  So it launches the computation ``add(x, y)`` in a local
``ThreadPoolExecutor`` and waits on the result.

*In the mean time the worker repeats this process concurrently for other tasks.
Nothing blocks.*

Eventually the computation completes.  The Worker stores this result in its
local memory::

    data['z'] = ...

And transmits back a success, and the number of bytes of the result::

    Worker: Hey Scheduler, 'z' worked great.
            I'm holding onto it.
            It takes up 64 bytes.

The worker does not transmit back the actual value for ``z``.

Step 6:  Scheduler Aftermath
----------------------------

The scheduler receives this message and does a few things:

1.  It notes that the worker has a free core, and sends up another task if
    available
2.  If ``x`` or ``y`` are no longer needed then it sends a message out to
    relevant workers to delete them from local memory.
3.  It sends a message to all of the clients that ``z`` is ready and so all
    client ``Future`` objects that are currently waiting should, wake up.  In
    particular, this wakes up the ``z.result()`` command executed by the user
    originally.


Step 7:  Gather
---------------

When the user calls ``z.result()`` they wait both on the completion of the
computation and for the computation to be sent back over the wire to the local
process.  Usually this isn't necessary, usually you don't want to move data
back to the local process but instead want to keep in on the cluster.

But perhaps the user really wanted to actually know this value, so they called
``z.result()``.

The scheduler checks who has ``z`` and sends them a message asking for the result.
This message doesn't wait in a queue or for other jobs to complete, it starts
instantly.  The value gets serialized, sent over TCP, and then deserialized and
returned to the user (passing through a queue or two on the way.)


Step 8:  Garbage Collection
---------------------------

The user leaves this part of their code and the local variable ``z`` goes out
of scope.  The Python garbage collector cleans it up.  This triggers a
decremented reference on the client (we didn't mention this, but when we
created the ``Future`` we also started a reference count.)  If this is the only
instance of a Future pointing to ``z`` then we send a message up to the
scheduler that it is OK to release ``z``.  The user no longer requires it to
persist.

The scheduler receives this message and, if there are no computations that
might depend on ``z`` in the immediate future, it removes elements of this key
from local scheduler state and adds the key to a list of keys to be deleted
periodically.  Every 500 ms a message goes out to relevant workers telling them
which keys they can delete from their local memory.  The graph/recipe to create
the result of ``z`` persists in the scheduler for all time.

Overhead
--------

The user experiences this in about 10 milliseconds, depending on network
latency.