1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
Journey of a Task
=================
We follow a single task through the user interface, scheduler, worker nodes,
and back. Hopefully this helps to illustrate the inner workings of the system.
User code
---------
A user computes the addition of two variables already on the cluster, then pulls the result back to the local process.
.. code-block:: python
client = Client('host:port')
x = client.submit(...)
y = client.submit(...)
z = client.submit(add, x, y) # we follow z
print(z.result())
Step 1: Client
----------------
``z`` begins its life when the ``Client.submit`` function sends the following
message to the ``Scheduler``::
{'op': 'update-graph',
'tasks': {'z': (add, x, y)},
'keys': ['z']}
The client then creates a ``Future`` object with the key ``'z'`` and returns
that object back to the user. This happens even before the message has been
received by the scheduler. The status of the future says ``'pending'``.
Step 2: Arrive in the Scheduler
-------------------------------
A few milliseconds later, the scheduler receives this message on an open socket.
The scheduler updates its state with this little graph that shows how to compute
``z``::
scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])
The scheduler also updates *a lot* of other state. Notably, it has to identify
that ``x`` and ``y`` are themselves variables, and connect all of those
dependencies. This is a long and detail oriented process that involves
updating roughly 10 sets and dictionaries. Interested readers should
investigate ``distributed/scheduler.py::update_graph()``. While this is fairly
complex and tedious to describe rest assured that it all happens in constant
time and in about a millisecond.
Step 3: Select a Worker
-----------------------
Once the latter of ``x`` and ``y`` finishes, the scheduler notices that all of
``z``'s dependencies are in memory and that ``z`` itself may now run. Which worker
should ``z`` select? We consider a sequence of criteria:
1. First, we quickly downselect to only those workers that have either ``x``
or ``y`` in local memory.
2. Then, we select the worker that would have to gather the least number of
bytes in order to get both ``x`` and ``y`` locally. E.g. if two different
workers have ``x`` and ``y`` and if ``y`` takes up more bytes than ``x``
then we select the machine that holds ``y`` so that we don't have to
communicate as much.
3. If there are multiple workers that require the minimum number of
communication bytes then we select the worker that is the least busy
``z`` considers the workers and chooses one based on the above criteria. In the
common case the choice is pretty obvious after step 1. ``z`` waits on a stack
associated with the chosen worker. The worker may still be busy though, so ``z``
may wait a while.
*Note: This policy is under flux and this part of this document is quite
possibly out of date.*
Step 4: Transmit to the Worker
------------------------------
Eventually the worker finishes a task, has a spare core, and ``z`` finds itself at
the top of the stack (note, that this may be some time after the last section
if other tasks placed themselves on top of the worker's stack in the meantime.)
We place ``z`` into a ``worker_queue`` associated with that worker and a
``worker_core`` coroutine pulls it out. ``z``'s function, the keys associated
to its arguments, and the locations of workers that hold those keys are packed
up into a message that looks like this::
{'op': 'compute',
'function': execute_task,
'args': ((add, 'x', 'y'),),
'who_has': {'x': {(worker_host, port)},
'y': {(worker_host, port), (worker_host, port)}},
'key': 'z'}
This message is serialized and sent across a TCP socket to the worker.
Step 5: Execute on the Worker
-----------------------------
The worker unpacks the message, and notices that it needs to have both ``x``
and ``y``. If the worker does not already have both of these then it gathers
them from the workers listed in the ``who_has`` dictionary also in the message.
For each key that it doesn't have it selects a valid worker from ``who_has`` at
random and gathers data from it.
After this exchange, the worker has both the value for ``x`` and the value for
``y``. So it launches the computation ``add(x, y)`` in a local
``ThreadPoolExecutor`` and waits on the result.
*In the mean time the worker repeats this process concurrently for other tasks.
Nothing blocks.*
Eventually the computation completes. The Worker stores this result in its
local memory::
data['z'] = ...
And transmits back a success, and the number of bytes of the result::
Worker: Hey Scheduler, 'z' worked great.
I'm holding onto it.
It takes up 64 bytes.
The worker does not transmit back the actual value for ``z``.
Step 6: Scheduler Aftermath
----------------------------
The scheduler receives this message and does a few things:
1. It notes that the worker has a free core, and sends up another task if
available
2. If ``x`` or ``y`` are no longer needed then it sends a message out to
relevant workers to delete them from local memory.
3. It sends a message to all of the clients that ``z`` is ready and so all
client ``Future`` objects that are currently waiting should, wake up. In
particular, this wakes up the ``z.result()`` command executed by the user
originally.
Step 7: Gather
---------------
When the user calls ``z.result()`` they wait both on the completion of the
computation and for the computation to be sent back over the wire to the local
process. Usually this isn't necessary, usually you don't want to move data
back to the local process but instead want to keep in on the cluster.
But perhaps the user really wanted to actually know this value, so they called
``z.result()``.
The scheduler checks who has ``z`` and sends them a message asking for the result.
This message doesn't wait in a queue or for other jobs to complete, it starts
instantly. The value gets serialized, sent over TCP, and then deserialized and
returned to the user (passing through a queue or two on the way.)
Step 8: Garbage Collection
---------------------------
The user leaves this part of their code and the local variable ``z`` goes out
of scope. The Python garbage collector cleans it up. This triggers a
decremented reference on the client (we didn't mention this, but when we
created the ``Future`` we also started a reference count.) If this is the only
instance of a Future pointing to ``z`` then we send a message up to the
scheduler that it is OK to release ``z``. The user no longer requires it to
persist.
The scheduler receives this message and, if there are no computations that
might depend on ``z`` in the immediate future, it removes elements of this key
from local scheduler state and adds the key to a list of keys to be deleted
periodically. Every 500 ms a message goes out to relevant workers telling them
which keys they can delete from their local memory. The graph/recipe to create
the result of ``z`` persists in the scheduler for all time.
Overhead
--------
The user experiences this in about 10 milliseconds, depending on network
latency.
|