File: high-level-graphs.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (198 lines) | stat: -rw-r--r-- 6,946 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
.. _high-level-graphs:

High Level Graphs
=================

Dask graphs produced by collections like Arrays, Bags, and DataFrames have
high-level structure that can be useful for visualization and high-level
optimization.  The task graphs produced by these collections encode this
structure explicitly as ``HighLevelGraph`` objects.  This document describes
how to work with these in more detail.


Motivation and Example
----------------------

In full generality, Dask schedulers expect arbitrary task graphs where each
node is a single Python function call and each edge is a dependency between
two function calls.  These are usually stored in flat dictionaries.  Here is
some simple Dask DataFrame code and the task graph that it might generate:

.. code-block:: python

    import dask.dataframe as dd

    df = dd.read_csv('myfile.*.csv')
    df = df + 100
    df = df[df.name == 'Alice']

.. code-block:: python

   {
    ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
    ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
    ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
    ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
    ('add', 0): (operator.add, ('read-csv', 0), 100),
    ('add', 1): (operator.add, ('read-csv', 1), 100),
    ('add', 2): (operator.add, ('read-csv', 2), 100),
    ('add', 3): (operator.add, ('read-csv', 3), 100),
    ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
    ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
    ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
    ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
   }

The task graph is a dictionary that stores every Pandas-level function call
necessary to compute the final result.  We can see that there is some structure
to this dictionary if we separate out the tasks that were associated to each
high-level Dask DataFrame operation:

.. code-block:: python

   {
    # From the dask.dataframe.read_csv call
    ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
    ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
    ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
    ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

    # From the df + 100 call
    ('add', 0): (operator.add, ('read-csv', 0), 100),
    ('add', 1): (operator.add, ('read-csv', 1), 100),
    ('add', 2): (operator.add, ('read-csv', 2), 100),
    ('add', 3): (operator.add, ('read-csv', 3), 100),

    # From the df[df.name == 'Alice'] call
    ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
    ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
    ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
    ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
   }

By understanding this high-level structure we are able to understand our task
graphs more easily (this is more important for larger datasets when there are
thousands of tasks per layer) and how to perform high-level optimizations.  For
example, in the case above we may want to automatically rewrite our code to
filter our datasets before adding 100:

.. code-block:: python

    # Before
    df = dd.read_csv('myfile.*.csv')
    df = df + 100
    df = df[df.name == 'Alice']

    # After
    df = dd.read_csv('myfile.*.csv')
    df = df[df.name == 'Alice']
    df = df + 100

Dask's high level graphs help us to explicitly encode this structure by storing
our task graphs in layers with dependencies between layers:

.. code-block:: python

   >>> import dask.dataframe as dd

   >>> df = dd.read_csv('myfile.*.csv')
   >>> df = df + 100
   >>> df = df[df.name == 'Alice']

   >>> graph = df.__dask_graph__()
   >>> graph.layers
   {
    'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

    'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
            ('add', 1): (operator.add, ('read-csv', 1), 100),
            ('add', 2): (operator.add, ('read-csv', 2), 100),
            ('add', 3): (operator.add, ('read-csv', 3), 100)}

    'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
               ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
               ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
               ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
   }

   >>> graph.dependencies
   {
    'read-csv': set(),
    'add': {'read-csv'},
    'filter': {'add'}
   }

While the DataFrame points to the output layers on which it depends directly:

.. code-block:: python

   >>> df.__dask_layers__()
   {'filter'}


HighLevelGraphs
---------------

The :obj:`HighLevelGraph` object is a ``Mapping`` object composed of other
sub-``Mappings``, along with a high-level dependency mapping between them:

.. code-block:: python

   class HighLevelGraph(Mapping):
       layers: Dict[str, Mapping]
       dependencies: Dict[str, Set[str]]

You can construct a HighLevelGraph explicitly by providing both to the
constructor:

.. code-block:: python

   layers = {
      'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                   ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                   ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                   ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

      'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
              ('add', 1): (operator.add, ('read-csv', 1), 100),
              ('add', 2): (operator.add, ('read-csv', 2), 100),
              ('add', 3): (operator.add, ('read-csv', 3), 100)},

      'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
                 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
                 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
                 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
   }

   dependencies = {'read-csv': set(),
                   'add': {'read-csv'},
                   'filter': {'add'}}

   graph = HighLevelGraph(layers, dependencies)

This object satisfies the ``Mapping`` interface, and so operates as a normal
Python dictionary that is the semantic merger of the underlying layers:

.. code-block:: python

   >>> len(graph)
   12
   >>> graph[('read-csv', 0)]
   ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),


API
---

.. currentmodule:: dask.highlevelgraph

.. autoclass:: HighLevelGraph
   :members:
   :inherited-members:
   :exclude-members: visualize

.. TODO: Fix graphviz dependency in docs build and remove ``visualize`` from
   exclude-members in the above directive