File: scheduler-overview.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (190 lines) | stat: -rw-r--r-- 6,010 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
:orphan:

Scheduler Overview
==================

After we create a dask graph, we use a scheduler to run it. Dask currently
implements a few different schedulers:

-  ``dask.threaded.get``: a scheduler backed by a thread pool
-  ``dask.multiprocessing.get``: a scheduler backed by a process pool
-  ``dask.get``: a synchronous scheduler, good for debugging
-  ``distributed.Client.get``: a distributed scheduler for executing graphs
   on multiple machines.  This lives in the external distributed_ project.

.. _distributed: https://distributed.dask.org/en/latest/


The ``get`` function
--------------------

The entry point for all schedulers is a ``get`` function. This takes a dask
graph, and a key or list of keys to compute:

.. code-block:: python

   >>> from operator import add

   >>> dsk = {'a': 1,
   ...        'b': 2,
   ...        'c': (add, 'a', 'b'),
   ...        'd': (sum, ['a', 'b', 'c'])}

   >>> get(dsk, 'c')
   3

   >>> get(dsk, 'd')
   6

   >>> get(dsk, ['a', 'b', 'c'])
   [1, 2, 3]


Using ``compute`` methods
-------------------------

When working with dask collections, you will rarely need to
interact with scheduler ``get`` functions directly. Each collection has a
default scheduler, and a built-in ``compute`` method that calculates the output
of the collection:

.. code-block:: python

    >>> import dask.array as da
    >>> x = da.arange(100, chunks=10)
    >>> x.sum().compute()
    4950

The compute method takes a number of keywords:

- ``scheduler``: the name of the desired scheduler as a string (``"threads"``, ``"processes"``, ``"single-threaded"``, etc.), a ``get`` function, or a ``dask.distributed.Client`` object.  Overrides the default for the collection.
- ``**kwargs``: extra keywords to pass on to the scheduler ``get`` function.

See also: :ref:`configuring-schedulers`.


The ``compute`` function
------------------------

You may wish to compute results from multiple dask collections at once.
Similar to the ``compute`` method on each collection, there is a general
``compute`` function that takes multiple collections and returns multiple
results. This merges the graphs from each collection, so intermediate results
are shared:

.. code-block:: python

    >>> y = (x + 1).sum()
    >>> z = (x + 1).mean()
    >>> da.compute(y, z)    # Compute y and z, sharing intermediate results
    (5050, 50.5)

Here the ``x + 1`` intermediate was only computed once, while calling
``y.compute()`` and ``z.compute()`` would compute it twice. For large graphs
that share many intermediates, this can be a big performance gain.

The ``compute`` function works with any dask collection, and is found in
``dask.base``. For convenience it has also been imported into the top level
namespace of each collection.

.. code-block:: python

    >>> from dask.base import compute
    >>> compute is da.compute
    True


.. _configuring-schedulers:

Configuring the schedulers
--------------------------

The dask collections each have a default scheduler:

- ``dask.array`` and ``dask.dataframe`` use the threaded scheduler by default
- ``dask.bag`` uses the multiprocessing scheduler by default.

For most cases, the default settings are good choices. However, sometimes you
may want to use a different scheduler. There are two ways to do this.

1. Using the ``scheduler`` keyword in the ``compute`` method:

    .. code-block:: python

        >>> x.sum().compute(scheduler='processes')

2. Using ``dask.config.set``. This can be used either as a context manager, or to
   set the scheduler globally:

    .. code-block:: python

        # As a context manager
        >>> with dask.config.set(scheduler='processes'):
        ...     x.sum().compute()

        # Set globally
        >>> dask.config.set(scheduler='processes')
        >>> x.sum().compute()


Additionally, each scheduler may take a few extra keywords specific to that
scheduler. For example, the multiprocessing and threaded schedulers each take a
``num_workers`` keyword, which sets the number of processes or threads to use
(defaults to number of cores). This can be set by passing the keyword when
calling ``compute``:

.. code-block:: python

    # Compute with 4 threads
    >>> x.compute(num_workers=4)

Alternatively, the multiprocessing and threaded schedulers will check for a
global pool set with ``dask.config.set``:

.. code-block:: python

    >>> from concurrent.futures import ThreadPoolExecutor
    >>> with dask.config.set(pool=ThreadPoolExecutor(4)):
    ...     x.compute()

The multiprocessing scheduler also supports `different contexts`_ ("spawn",
"forkserver", "fork") which you can set with ``dask.config.set``. The default
context is "spawn", but you can set a different one:

.. code-block:: python

   >>> with dask.config.set({"multiprocessing.context": "forkserver"}):
   ...     x.compute()

.. _different contexts: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

For more information on the individual options for each scheduler, see the
docstrings for each scheduler ``get`` function.


Debugging the schedulers
------------------------

Debugging parallel code can be difficult, as conventional tools such as ``pdb``
don't work well with multiple threads or processes. To get around this when
debugging, we recommend using the synchronous scheduler found at
``dask.get``. This runs everything serially, allowing it to work
well with ``pdb``:

.. code-block:: python

    >>> dask.config.set(scheduler='single-threaded')
    >>> x.sum().compute()    # This computation runs serially instead of in parallel


The shared memory schedulers also provide a set of callbacks that can be used
for diagnosing and profiling. You can learn more about scheduler callbacks and
diagnostics :doc:`here <diagnostics-local>`.


More Information
----------------

- See :doc:`shared` for information on the design of the shared memory
  (threaded or multiprocessing) schedulers
- See distributed_ for information on the distributed memory scheduler