1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
.. _high-level-graphs:
High Level Graphs
=================
Dask graphs produced by collections like Arrays, Bags, and DataFrames have
high-level structure that can be useful for visualization and high-level
optimization. The task graphs produced by these collections encode this
structure explicitly as ``HighLevelGraph`` objects. This document describes
how to work with these in more detail.
Motivation and Example
----------------------
In full generality, Dask schedulers expect arbitrary task graphs where each
node is a single Python function call and each edge is a dependency between
two function calls. These are usually stored in flat dictionaries. Here is
some simple Dask DataFrame code and the task graph that it might generate:
.. code-block:: python
import dask.dataframe as dd
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
.. code-block:: python
{
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
The task graph is a dictionary that stores every Pandas-level function call
necessary to compute the final result. We can see that there is some structure
to this dictionary if we separate out the tasks that were associated to each
high-level Dask DataFrame operation:
.. code-block:: python
{
# From the dask.dataframe.read_csv call
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
# From the df + 100 call
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
# From the df[df.name == 'Alice'] call
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
By understanding this high-level structure we are able to understand our task
graphs more easily (this is more important for larger datasets when there are
thousands of tasks per layer) and how to perform high-level optimizations. For
example, in the case above we may want to automatically rewrite our code to
filter our datasets before adding 100:
.. code-block:: python
# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100
Dask's high level graphs help us to explicitly encode this structure by storing
our task graphs in layers with dependencies between layers:
.. code-block:: python
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']
>>> graph = df.__dask_graph__()
>>> graph.layers
{
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)}
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies
{
'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}
}
While the DataFrame points to the output layers on which it depends directly:
.. code-block:: python
>>> df.__dask_layers__()
{'filter'}
HighLevelGraphs
---------------
The :obj:`HighLevelGraph` object is a ``Mapping`` object composed of other
sub-``Mappings``, along with a high-level dependency mapping between them:
.. code-block:: python
class HighLevelGraph(Mapping):
layers: Dict[str, Mapping]
dependencies: Dict[str, Set[str]]
You can construct a HighLevelGraph explicitly by providing both to the
constructor:
.. code-block:: python
layers = {
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)},
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
dependencies = {'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}}
graph = HighLevelGraph(layers, dependencies)
This object satisfies the ``Mapping`` interface, and so operates as a normal
Python dictionary that is the semantic merger of the underlying layers:
.. code-block:: python
>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
API
---
.. currentmodule:: dask.highlevelgraph
.. autoclass:: HighLevelGraph
:members:
:inherited-members:
:exclude-members: visualize
.. TODO: Fix graphviz dependency in docs build and remove ``visualize`` from
exclude-members in the above directive
|