File: delayed.rst

package info (click to toggle)
dask 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 6,856 kB
  • sloc: python: 51,266; sh: 178; makefile: 142
file content (157 lines) | stat: -rw-r--r-- 3,928 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
Delayed
=======

.. toctree::
   :maxdepth: 1
   :hidden:

   delayed-api.rst
   delayed-collections.rst
   delayed-best-practices.rst

Sometimes problems don't fit into one of the collections like ``dask.array`` or
``dask.dataframe``. In these cases, users can parallelize custom algorithms
using the simpler ``dask.delayed`` interface. This allows one to create graphs
directly with a light annotation of normal python code:

.. code-block:: python

   >>> x = dask.delayed(inc)(1)
   >>> y = dask.delayed(inc)(2)
   >>> z = dask.delayed(add)(x, y)
   >>> z.compute()
   5
   >>> z.vizualize()

.. image:: images/inc-add.svg
   :alt: simple task graph created with dask.delayed

Example
-------

Sometimes we face problems that are parallelizable, but don't fit into high-level
abstractions like Dask Array or Dask DataFrame.  Consider the following example:

.. code-block:: python

    def inc(x):
        return x + 1

    def double(x):
        return x + 2

    def add(x, y):
        return x + y

    data = [1, 2, 3, 4, 5]

    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)

    total = sum(output)

There is clearly parallelism in this problem (many of the ``inc``,
``double``, and ``add`` functions can evaluate independently), but it's not
clear how to convert this to a big array or big DataFrame computation.

As written, this code runs sequentially in a single thread.  However, we see that
a lot of this could be executed in parallel.

The Dask ``delayed`` function decorates your functions so that they operate
*lazily*.  Rather than executing your function immediately, it will defer
execution, placing the function and its arguments into a task graph.

.. currentmodule:: dask.delayed

.. autosummary::
    delayed

We slightly modify our code by wrapping functions in ``delayed``.
This delays the execution of the function and generates a Dask graph instead:

.. code-block:: python

    import dask

    output = []
    for x in data:
        a = dask.delayed(inc)(x)
        b = dask.delayed(double)(x)
        c = dask.delayed(add)(a, b)
        output.append(c)

    total = dask.delayed(sum)(output)

We used the ``dask.delayed`` function to wrap the function calls that we want
to turn into tasks.  None of the ``inc``, ``double``, ``add``, or ``sum`` calls
have happened yet. Instead, the object ``total`` is a ``Delayed`` result that
contains a task graph of the entire computation.  Looking at the graph we see
clear opportunities for parallel execution.  The Dask schedulers will exploit
this parallelism, generally improving performance (although not in this
example, because these functions are already very small and fast.)

.. code-block:: python

   total.visualize()  # see image to the right

.. image:: images/delayed-inc-double-add.svg
   :align: right
   :alt: simple task graph created with dask.delayed

We can now compute this lazy result to execute the graph in parallel:

.. code-block:: python

   >>> total.compute()
   45

Decorator
---------

It is also common to see the delayed function used as a decorator.  Here is a
reproduction of our original problem as a parallel code:

.. code-block:: python

    import dask

    @dask.delayed
    def inc(x):
        return x + 1

    @dask.delayed
    def double(x):
        return x + 2

    @dask.delayed
    def add(x, y):
        return x + y

    data = [1, 2, 3, 4, 5]

    output = []
    for x in data:
        a = inc(x)
        b = double(x)
        c = add(a, b)
        output.append(c)

    total = dask.delayed(sum)(output)


Real time
---------

Sometimes you want to create and destroy work during execution, launch tasks
from other tasks, etc.  For this, see the :doc:`Futures <futures>` interface.


Best Practices
--------------

For a list of common problems and recommendations see :doc:`Delayed Best
Practices <delayed-best-practices>`.