1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
|
Worker State Machine
====================
.. currentmodule:: distributed.worker_state_machine
Task states
-----------
When the Scheduler asks a Worker to compute a task, it is tracked by the Worker through
a :class:`distributed.worker_state_machine.TaskState` object - not to be confused with
the matching scheduler-side class :class:`distributed.scheduler.TaskState`.
The class has a key attribute, :attr:`TaskState.state`, which can assume the following
values:
released
Known but not actively computing or in memory. A task can stay in this state when
the scheduler asked to forget it, but it has dependent tasks on the same worker.
waiting
The scheduler has added the task to the worker queue. All of its dependencies are
in memory somewhere on the cluster, but not all of them are in memory on the current
worker, so they need to be fetched.
fetch
This task is in memory on one or more peer workers, but not on this worker. Its data
is queued to be transferred over the network, either because it's a dependency of a
task in ``waiting`` state, or because the :doc:`active_memory_manager` requested it
to be replicated here.
The task can be found in the :attr:`WorkerState.data_needed` heap.
missing
Like ``fetch``, but all peer workers that were listed by the scheduler are either
unreachable or have responded they don't actually have the task data. The worker
will periodically ask the scheduler if it knows of additional replicas; when it
does, the task will transition again to ``fetch``.
The task can be found in the :attr:`WorkerState.missing_dep_flight` set.
flight
The task data is currently being transferred over the network from another worker.
The task can be found in the :attr:`WorkerState.in_flight_tasks` and
:attr:`WorkerState.in_flight_workers` collections.
ready
The task is ready to be computed; all of its dependencies are in memory on the
current worker and it's waiting for an available thread.
The task can be found in the :attr:`WorkerState.ready` heap.
constrained
Like ``ready``, but the user specified :doc:`resource constraints <resources>` for
this task.
The task can be found in the :attr:`WorkerState.constrained` queue.
executing
The task is currently being computed on a thread.
It can be found in the :attr:`WorkerState.executing` set and in the
:attr:`distributed.worker.Worker.active_threads` dict.
long-running
Like ``executing``, but the user code called :func:`distributed.secede` so the task
no longer counts towards the maximum number of concurrent tasks.
It can be found in the :attr:`WorkerState.long_running` set and in the
:attr:`distributed.worker.Worker.active_threads` dict.
rescheduled
The task just raised the :class:`~distributed.Reschedule` exception. This is a
transitory state, which is not stored permanently.
cancelled
The scheduler asked to forget about this task, but it's technically impossible at
the moment. See :ref:`cancelled-tasks`. The task can be found in whatever
collections it was in its :attr:`~TaskState.previous` state.
resumed
The task was recovered from ``cancelled`` state. See :ref:`cancelled-tasks`.
The task can be found in whatever collections it was in its
:attr:`~TaskState.previous` state.
memory
Task execution completed, or the task was successfully transferred from another
worker, and is now held in either :class:`WorkerState.data` or
:class:`WorkerState.actors`.
error
Task execution failed. Alternatively, task execution completed successfully, or the
task data transferred successfully over the network, but it failed to serialize or
deserialize. The full exception and traceback are stored in the task itself, so that
they can be re-raised on the client.
forgotten
The scheduler asked this worker to forget about the task, and there are neither
dependents nor dependencies on the same worker. As soon as a task reaches this
state, it is immediately dereferenced from the :class:`WorkerState` and will be soon
garbage-collected. This is the only case where two instances of a :class:`TaskState`
object with the same :attr:`~TaskState.key` can (transitorily) exist in the same
interpreter at the same time.
Fetching dependencies
---------------------
.. image:: images/worker-dep-state.svg
:alt: Worker states for dependencies
As tasks that need to be computed arrive on the Worker, any dependencies that are not
already in memory on the same worker are wrapped by a :class:`TaskState` object and
contain a listing of workers (:attr:`TaskState.who_has`) to collect their result from.
These :class:`TaskState` objects have their state set to ``fetch``, are put in the
:attr:`~WorkerState.data_needed` heap, and are progressively transferred over the
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (:attr:`~WorkerState.transfer_message_bytes_limit`, which is acquired from the
configuration key ``distributed.worker.transfer.message-bytes-limit``) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
acquired from the configuration key ``distributed.worker.connections.outgoing``) so as
to avoid overly-fragmenting our network bandwidth.
In the event that the network comms between two workers are saturated, a dependency task
may cycle between ``fetch`` and ``flight`` until it is successfully collected. It may
also happen that a peer worker responds that it doesn't have a replica of the requested
data anymore; finally, the peer worker may be unreachable or unresponsive. When that
happens, the peer is removed from :attr:`~TaskState.who_has` and the task is
transitioned back to ``fetch``, so that the Worker will try gathering the same key from
a different peer. If :attr:`~TaskState.who_has` becomes empty due to this process, the
task transitions to ``missing`` and the Worker starts periodically asking the Scheduler
if additional peers are available.
The same system used for fetching dependencies is also used by
:doc:`active_memory_manager` replication.
.. note::
There is at most one :meth:`~BaseWorker.gather_dep` asyncio task running at any
given time for any given peer worker. If all workers holding a replica of a task
in ``fetch`` state are already in flight, the task will remain in ``fetch`` state
until a worker becomes available again.
Computing tasks
---------------
A :class:`TaskState` that needs to be computed proceeds on the Worker through the
following pipeline. It has its :attr:`~TaskState.run_spec` defined, which instructs the
worker how to execute it.
.. image:: images/worker-execute-state.svg
:alt: Worker states for computing tasks
After all dependencies for a task are in memory, the task transitions from ``waiting``
to ``ready`` or ``constrained`` and is added to the :attr:`~WorkerState.ready` heap.
As soon as a thread is available, we pop a task from the top of the heap and put the
task into a thread from a local thread pool to execute.
Optionally, while it's running, this task may identify itself as a long-running task
(see :doc:`Tasks launching tasks <task-launch>`), at which point it secedes from the
thread pool and changes state to `long-running`. ``executing`` and ``long-running`` are
almost identical states, the only difference being that the latter don't count towards
the maximum number of tasks running in parallel at the same time.
A task can terminate in three ways:
- Complete successfully; its return value is stored in either :attr:`~WorkerState.data`
or :attr:`~WorkerState.actors`
- Raise an exception; the exception and traceback are stored on the :class:`TaskState`
object
- Raise :class:`~distributed.Reschedule`; it is immediately forgotten.
In all cases, the outcome is sent back to the scheduler.
Scattered data
--------------
:meth:`Scattered data <distributed.Client.scatter>` follows an even simpler path,
landing directly in ``memory``:
.. image:: images/worker-scatter-state.svg
:alt: Worker states for scattered data
Forgetting tasks
----------------
Once a task is in ``memory`` or ``error``, the Worker will hold onto it indefinitely,
until the Scheduler explicitly asks the Worker to forget it.
This happens when there are no more Clients holding a reference to the key and there are
no more waiter tasks (that is, dependents that have not been computed). Additionally,
the :doc:`active_memory_manager` may ask to drop excess replicas of a task.
In the case of ``rescheduled``, the task will instead immediately transition to
``released`` and then ``forgotten`` without waiting for the scheduler.
.. image:: images/worker-forget-state.svg
:alt: Worker states for computing tasks
Irregular flow
--------------
There are a few important exceptions to the flow diagrams above:
- A task is `stolen <work-stealing>`_, in which case it transitions from ``waiting``,
``ready``, or ``constrained`` directly to ``released``. Note that steal requests for
tasks that are currently executing are rejected.
- Scheduler intercession, in which the scheduler reassigns a task that was
previously assigned to a separate worker to a new worker. This most commonly
occurs when a :doc:`worker dies <killed>` during computation.
- Client intercession, where a client either explicitly releases a Future or descopes
it; alternatively the whole client may shut down or become unresponsive. When there
are no more clients holding references to a key or one of its dependents, the
Scheduler will release it.
In short:
.. important::
A task can transition to ``released`` from *any* state, not just those in the
diagrams above.
If there are no dependants, the task immediately transitions to ``forgotten`` and is
descoped. However, there is an important exception, :ref:`cancelled-tasks`.
.. _cancelled-tasks:
Task cancellation
-----------------
The Worker may receive a request to release a key while it is currently in ``flight``,
``executing``, or ``long-running``. Due to technical limitations around cancelling
Python threads, and the way data fetching from peer workers is currently implemented,
such an event cannot cause the related asyncio task (and, in the case of ``executing`` /
``long-running``, the thread running the user code) to be immediately aborted. Instead,
tasks in these three states are instead transitioned to another state, ``cancelled``,
which means that the asyncio task will proceed to completion (outcome is irrelevant) and
then* the Dask task will be released.
The ``cancelled`` state has a substate, :attr:`~TaskState.previous`, which is set to one
of the above three states. The common notation for this ``<state>(<previous>)``,
e.g. ``cancelled(flight)``.
While a task is cancelled, one of three things will happen:
- Nothing happens before the asyncio task completes; e.g. the Scheduler does not change
its mind and still wants the Worker to forget about the task until the very end.
When that happens, the task transitions from ``cancelled`` to ``released`` and,
typically, ``forgotten``.
- The scheduler switches back to its original request:
- The scheduler asks the Worker to fetch a task that is currently
``cancelled(flight)``; at which point the task will immediately revert to
``flight``, forget that cancellation ever happened, and continue waiting on the data
fetch that's already running;
- The scheduler asks the Worker to compute a task that is currently
``cancelled(executing)`` or ``cancelled(long-running)``. The Worker will completely
disregard the new :attr:`~TaskState.run_spec` (if it changed), switch back to the
:attr:`~TaskState.previous` state, and wait for the already executing thread to
finish.
- The scheduler flips to the opposite request, from fetch to computation or the other
way around.
To serve this last use case there is another special state, ``resumed``. A task can
enter ``resumed`` state exclusively from ``cancelled``. ``resumed`` retains the
:attr:`~TaskState.previous` attribute from the ``cancelled`` state and adds another
attribute, :attr:`~TaskState.next`, which is always:
- ``fetch``, if :attr:`~TaskState.previous` is ``executing`` or ``long-running``
- ``waiting``, if :attr:`~TaskState.previous` is ``flight``
To recap, these are all possible permutations of states and substates to handle
cancelled tasks:
========= ============ =======
state previous next
========= ============ =======
cancelled flight None
cancelled executing None
cancelled long-running None
resumed flight waiting
resumed executing fetch
resumed long-running fetch
========= ============ =======
If a ``resumed`` task completes successfully, it will transition to ``memory`` (as
opposed to a ``cancelled`` task, where the output is disregarded) and the Scheduler
will be informed with a spoofed termination message, that is the expected end message
for ``flight`` if the task is ``resumed(executing->fetch)`` or
``resumed(long-running->fetch)``, and the expected end message for ``execute`` if
the task is ``resumed(flight->waiting)``.
If the task fails or raises :class:`~distributed.Reschedule`, the Worker will instead
silently ignore the exception and switch to its intended course, so
``resumed(executing->fetch)`` or ``resumed(long-running->fetch)`` will transition to
``fetch`` and ``resumed(flight->waiting)`` will transition to ``waiting``.
Finally, the scheduler can change its mind multiple times over the lifetime of the task,
so a ``resumed(executing->fetch)`` or ``resumed(long-running->fetch)`` task may be
requested to transition to ``waiting`` again, at which point it will just revert to its
:attr:`~TaskState.previous` state and forget the whole incident; likewise a
``resumed(flight->waiting)`` task could be requested to transition to ``fetch`` again,
so it will just transition to ``flight`` instead.
.. image:: images/worker-cancel-state1.svg
:alt: Worker states for cancel/resume
.. image:: images/worker-cancel-state2.svg
:alt: Worker states for cancel/resume
**A common real-life use case**
1. There are at least two workers on the cluster, A and B.
2. Task x is computed successfully on worker A.
3. When task x transitions to memory on worker A, the scheduler asks worker B to compute
task y, which depends on task x.
4. B starts acquiring the key x from A, which sends the task into ``flight`` mode.
5. Worker A crashes, and for whatever reason the scheduler notices before worker B does.
6. The scheduler will release task y (because it's waiting on dependencies that are
nowhere to be found in memory anymore) and reschedule task x somewhere else on the
cluster. Task x will transition to ``cancelled(flight)`` on worker A.
7. If the scheduler randomly chooses worker A to compute task X, the task will
transition to ``resumed(flight->waiting)``.
8. When, *and only when*, the TCP socket from A to B collapses (e.g. due to timeout),
the task will transition to ``waiting`` and will be eventually recomputed on A.
.. important::
You always have *at most* one :meth:`~WorkerBase.compute` or
:meth:`~WorkerBase.gather_dep` asyncio task running for any one given key; you
never have both.
Task state mapping between Scheduler and Worker
-----------------------------------------------
The task states on the scheduler and the worker are different, and their mapping is
somewhat nuanced:
+------------------+-----------------------+-------------------------+
| Scheduler states | Typical worker states | Edge case worker states |
+==================+=======================+=========================+
| - released | - (unknown) | - released |
| - waiting | | - cancelled |
| - no-worker | | |
+------------------+-----------------------+-------------------------+
| - processing | - waiting | - resumed(waiting) |
| | - ready | |
| | - constrained | |
| | - executing | |
| | - long-running | |
+------------------+-----------------------+-------------------------+
| - memory | - memory | - error |
| | - fetch | - missing |
| | - flight | - resumed(fetch) |
+------------------+-----------------------+-------------------------+
| - erred | - error | |
+------------------+-----------------------+-------------------------+
In addition to the above states, a worker may not know about a specific task at all.
The opposite, where the worker knows about a task but it is nowhere to be found on the
scheduler, happens exclusively in the case of :ref:`cancelled-tasks`.
There are also *race conditions* to be considered, where a worker (or some workers) know
something before the scheduler does, or the other way around. For example,
- A task will always transition from ``executing`` to ``memory`` on the worker before
it can transition from ``processing`` to ``memory`` on the scheduler
- A task will always transition to ``released`` or ``forgotten`` on the scheduler first,
and only when the message reaches the worker it will be released there too.
Flow control
------------
.. image:: images/worker-state-machine.svg
:alt: Worker state machine control flow
There are several classes involved in the worker state machine:
:class:`TaskState` includes all the information related to a single task; it also
includes references to dependent and dependency tasks. This is just a data holder, with
no mutating methods. Note that this is a distinct class from
:class:`distributed.scheduler.TaskState`.
:class:`WorkerState` encapsulates the state of the worker as a whole. It holds
references to :class:`TaskState` in its :attr:`~WorkerState.tasks` dictionary and in
several other secondary collections. Crucially, this class has no knowledge or
visibility whatsoever on asyncio, networking, disk I/O, threads, etc.
Note that this is a distinct class from :class:`distributed.scheduler.WorkerState`.
:class:`WorkerState` offers a single method to mutate the state:
:meth:`~WorkerState.handle_stimulus`. The state must not be altered in any other way.
The method acquires a :class:`StateMachineEvent`, a.k.a. *stimulus*, which is a data
class which determines that something happened which may cause the worker state to
mutate. A stimulus can arrive from either the scheduler (e.g. a request to compute a
task) or from the worker itself (e.g. a task has finished computing).
:meth:`WorkerState.handle_stimulus` alters the internal state (e.g., it could transition
a task from ``executing`` to ``memory``) and returns a list of :class:`Instruction`
objects, which are actions that the worker needs to take but are external to the state
itself:
- send a message to the scheduler
- compute a task
- gather a task from a peer worker
:meth:`WorkerState.handle_stimulus` is wrapped by :meth:`BaseWorker.handle_stimulus`,
which consumes the :class:`Instruction` objects. :class:`BaseWorker` deals with asyncio
task creation, tracking, and cleanup, but does not actually implement the actual task
execution or gather; instead it exposes abstract async methods
:meth:`~BaseWorker.execute` and :meth:`~BaseWorker.gather_dep`, which are then
overridden by its subclass :class:`~distributed.Worker`, which actually runs tasks and
performs network I/O. When the implemented methods finish, they must return a
:class:`StateMachineEvent`, which is fed back into :meth:`BaseWorker.handle_stimulus`.
.. note::
This can create a (potentially very long) chain of events internal to the worker;
e.g. if there are more tasks in the :attr:`~WorkerState.ready` queue than there are
threads, then the termination :class:`StateMachineEvent` of one task will trigger the
:class:`Instruction` to execute the next one.
To summarize:
- :class:`WorkerState` is agnostic to asyncio, networking, threading, and disk I/O; it
includes collections of :class:`TaskState` objects.
- :class:`BaseWorker` encapsulates :class:`WorkerState` and adds awareness of asyncio
- :class:`~distributed.Worker` subclasses :class:`BaseWorker` and adds awereness of
networking, threading, and disk I/O.
Internal state permutation
--------------------------
Internally, :meth:`WorkerState.handle_stimulus` works very similarly to
:ref:`the same process on the scheduler side <scheduling_state_implementation>`:
#. :meth:`WorkerState.handle_stimulus` calls ``WorkerState._handle_<stimulus name>()``,
#. which returns a tuple of
- *recommendations* to transition tasks: {:class:`TaskState`: <new state>}
- list of :class:`Instruction` objects
#. :meth:`WorkerState.handle_stimulus` then passes the recommendations to
:meth:`WorkerState._transitions`
#. For each recommendation, :meth:`WorkerState._transitions` calls
:meth:`WorkerState._transition`,
#. which in turn calls ``WorkerState._transition_<start state>_<end state>()``,
#. which in turn returns an additional tuple of (recommendations, instructions)
#. the new recommendations are consumed by :meth:`WorkerState._transitions`, until no
more recommendations are returned.
#. :meth:`WorkerState.handle_stimulus` finally returns the list of instructions, which
has been progressively extended by the transitions.
API Documentation
-----------------
.. autoclass:: TaskState
:members:
.. autoclass:: WorkerState
:members:
.. autoclass:: BaseWorker
:members:
.. autoclass:: StateMachineEvent
:members:
.. autoclass:: Instruction
:members:
.. note::
:class:`StateMachineEvent` and :class:`Instruction` are abstract classes, with many
subclasses which are not listed here for the sake of brevity.
Refer to the implementation module :mod:`distributed.worker_state_machine` for the
full list.
|