1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
:orphan:
Scheduler Overview
==================
After we create a dask graph, we use a scheduler to run it. Dask currently
implements a few different schedulers:
- ``dask.threaded.get``: a scheduler backed by a thread pool
- ``dask.multiprocessing.get``: a scheduler backed by a process pool
- ``dask.get``: a synchronous scheduler, good for debugging
- ``distributed.Client.get``: a distributed scheduler for executing graphs
on multiple machines. This lives in the external distributed_ project.
.. _distributed: https://distributed.dask.org/en/latest/
The ``get`` function
--------------------
The entry point for all schedulers is a ``get`` function. This takes a dask
graph, and a key or list of keys to compute:
.. code-block:: python
>>> from operator import add
>>> dsk = {'a': 1,
... 'b': 2,
... 'c': (add, 'a', 'b'),
... 'd': (sum, ['a', 'b', 'c'])}
>>> get(dsk, 'c')
3
>>> get(dsk, 'd')
6
>>> get(dsk, ['a', 'b', 'c'])
[1, 2, 3]
Using ``compute`` methods
-------------------------
When working with dask collections, you will rarely need to
interact with scheduler ``get`` functions directly. Each collection has a
default scheduler, and a built-in ``compute`` method that calculates the output
of the collection:
.. code-block:: python
>>> import dask.array as da
>>> x = da.arange(100, chunks=10)
>>> x.sum().compute()
4950
The compute method takes a number of keywords:
- ``scheduler``: the name of the desired scheduler as a string (``"threads"``, ``"processes"``, ``"single-threaded"``, etc.), a ``get`` function, or a ``dask.distributed.Client`` object. Overrides the default for the collection.
- ``**kwargs``: extra keywords to pass on to the scheduler ``get`` function.
See also: :ref:`configuring-schedulers`.
The ``compute`` function
------------------------
You may wish to compute results from multiple dask collections at once.
Similar to the ``compute`` method on each collection, there is a general
``compute`` function that takes multiple collections and returns multiple
results. This merges the graphs from each collection, so intermediate results
are shared:
.. code-block:: python
>>> y = (x + 1).sum()
>>> z = (x + 1).mean()
>>> da.compute(y, z) # Compute y and z, sharing intermediate results
(5050, 50.5)
Here the ``x + 1`` intermediate was only computed once, while calling
``y.compute()`` and ``z.compute()`` would compute it twice. For large graphs
that share many intermediates, this can be a big performance gain.
The ``compute`` function works with any dask collection, and is found in
``dask.base``. For convenience it has also been imported into the top level
namespace of each collection.
.. code-block:: python
>>> from dask.base import compute
>>> compute is da.compute
True
.. _configuring-schedulers:
Configuring the schedulers
--------------------------
The dask collections each have a default scheduler:
- ``dask.array`` and ``dask.dataframe`` use the threaded scheduler by default
- ``dask.bag`` uses the multiprocessing scheduler by default.
For most cases, the default settings are good choices. However, sometimes you
may want to use a different scheduler. There are two ways to do this.
1. Using the ``scheduler`` keyword in the ``compute`` method:
.. code-block:: python
>>> x.sum().compute(scheduler='processes')
2. Using ``dask.config.set``. This can be used either as a context manager, or to
set the scheduler globally:
.. code-block:: python
# As a context manager
>>> with dask.config.set(scheduler='processes'):
... x.sum().compute()
# Set globally
>>> dask.config.set(scheduler='processes')
>>> x.sum().compute()
Additionally, each scheduler may take a few extra keywords specific to that
scheduler. For example, the multiprocessing and threaded schedulers each take a
``num_workers`` keyword, which sets the number of processes or threads to use
(defaults to number of cores). This can be set by passing the keyword when
calling ``compute``:
.. code-block:: python
# Compute with 4 threads
>>> x.compute(num_workers=4)
Alternatively, the multiprocessing and threaded schedulers will check for a
global pool set with ``dask.config.set``:
.. code-block:: python
>>> from concurrent.futures import ThreadPoolExecutor
>>> with dask.config.set(pool=ThreadPoolExecutor(4)):
... x.compute()
The multiprocessing scheduler also supports `different contexts`_ ("spawn",
"forkserver", "fork") which you can set with ``dask.config.set``. The default
context is "spawn", but you can set a different one:
.. code-block:: python
>>> with dask.config.set({"multiprocessing.context": "forkserver"}):
... x.compute()
.. _different contexts: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
For more information on the individual options for each scheduler, see the
docstrings for each scheduler ``get`` function.
Debugging the schedulers
------------------------
Debugging parallel code can be difficult, as conventional tools such as ``pdb``
don't work well with multiple threads or processes. To get around this when
debugging, we recommend using the synchronous scheduler found at
``dask.get``. This runs everything serially, allowing it to work
well with ``pdb``:
.. code-block:: python
>>> dask.config.set(scheduler='single-threaded')
>>> x.sum().compute() # This computation runs serially instead of in parallel
The shared memory schedulers also provide a set of callbacks that can be used
for diagnosing and profiling. You can learn more about scheduler callbacks and
diagnostics :doc:`here <diagnostics-local>`.
More Information
----------------
- See :doc:`shared` for information on the design of the shared memory
(threaded or multiprocessing) schedulers
- See distributed_ for information on the distributed memory scheduler
|