File: graphs.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (160 lines) | stat: -rw-r--r-- 5,433 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
.. _graphs:

Task Graphs
===========

Internally, Dask encodes algorithms in a simple format involving Python dicts,
tuples, and functions. This graph format can be used in isolation from the
dask collections. Working directly with dask graphs is rare, though, unless you intend
to develop new modules with Dask.  Even then, :doc:`dask.delayed <delayed>` is
often a better choice. If you are a *core developer*, then you should start here.

.. toctree::
   :maxdepth: 1

   spec.rst
   custom-graphs.rst
   optimize.rst
   graph_manipulation.rst
   custom-collections.rst
   high-level-graphs.rst


Motivation
----------

Normally, humans write programs and then compilers/interpreters interpret them
(for example, ``python``, ``javac``, ``clang``).  Sometimes humans disagree with how
these compilers/interpreters choose to interpret and execute their programs.
In these cases, humans often bring the analysis, optimization, and execution of
code into the code itself.

Commonly a desire for parallel execution causes this shift of responsibility
from compiler to human developer.  In these cases, we often represent the
structure of our program explicitly as data within the program itself.

A common approach to parallel execution in user-space is *task scheduling*.  In
task scheduling we break our program into many medium-sized tasks or units of
computation, often a function call on a non-trivial amount of data.  We
represent these tasks as nodes in a graph with edges between nodes if one task
depends on data produced by another.  We call upon a *task scheduler* to
execute this graph in a way that respects these data dependencies and leverages
parallelism where possible, so multiple independent tasks can be run
simultaneously.

|

.. figure:: images/map-reduce-task-scheduling.svg
   :scale: 40%

   There are a number of methods for task scheduling, including embarrassingly parallel, MapReduce, and full task scheduling.

|

Many solutions exist.  This is a common approach in parallel execution
frameworks.  Often task scheduling logic hides within other larger frameworks
(e.g. Luigi, Storm, Spark, IPython Parallel, etc.) and so is often reinvented.
Dask is a specification that encodes full task scheduling with minimal incidental
complexity using terms common to all Python projects, namely, dicts, tuples,
and callables.  Ideally this minimum solution is easy to adopt and understand
by a broad community.

Example
-------

Consider the following simple program:

.. code-block:: python

   def inc(i):
       return i + 1

   def add(a, b):
       return a + b

   x = 1
   y = inc(x)
   z = add(y, 10)

We encode this as a dictionary in the following way:

.. code-block:: python

   d = {'x': 1,
        'y': (inc, 'x'),
        'z': (add, 'y', 10)}

Which is represented by the following Dask graph:

.. image:: _static/dask-simple.png
   :height: 400px
   :alt: A simple dask dictionary

|

While less pleasant than our original code, this representation can be analyzed
and executed by other Python code, not just the CPython interpreter.  We don't
recommend that users write code in this way, but rather that it is an
appropriate target for automated systems.  Also, in non-toy examples, the
execution times are likely much larger than for ``inc`` and ``add``, warranting
the extra complexity.


Schedulers
----------

The Dask library currently contains a few schedulers to execute these
graphs.  Each scheduler works differently, providing different performance
guarantees and operating in different contexts.  These implementations are not
special and others can write different schedulers better suited to other
applications or architectures easily.  Systems that emit dask graphs (like
Dask Array, Dask Bag, and so on) may leverage the appropriate scheduler for
the application and hardware.


Task Expectations
-----------------

When a task is submitted to Dask for execution, there are a number of assumptions
that are made about that task.

Don't Modify Data In-Place
~~~~~~~~~~~~~~~~~~~~~~~~~~

In general, tasks with side-effects that alter the state of a future in-place
are not recommended. Modifying data that is stored in Dask in-place can have
unintended consequences. For example, consider a workflow involving a Numpy
array:

.. code-block:: python

   from dask.distributed import Client
   import numpy as np

   client = Client()
   x = client.submit(np.arange, 10)  # [0, 1, 2, 3, ...]

   def f(arr):
       arr[arr > 5] = 0  # modifies input directly without making a copy
       arr += 1          # modifies input directly without making a copy
       return arr

   y = client.submit(f, x)

In the example above Dask will update the values of the Numpy array
``x`` in-place.  While efficient, this behavior can have unintended consequences,
particularly if other tasks need to use ``x``, or if Dask needs to rerun this
computation multiple times because of worker failure.


Avoid Holding the GIL
~~~~~~~~~~~~~~~~~~~~~

Some Python functions that wrap external C/C++ code can hold onto the GIL,
which stops other Python code from running in the background.  This is
troublesome because while Dask workers run your function, they also need to
communicate to each other in the background.

If you wrap external code then please try to release the GIL.  This is usually
easy to do if you are using any of the common solutions to code-wrapping like
Cython, Numba, ctypes or others.