File: user-interfaces.rst

package info (click to toggle)
dask 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 6,856 kB
  • sloc: python: 51,266; sh: 178; makefile: 142
file content (342 lines) | stat: -rw-r--r-- 12,182 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
User Interfaces
===============

Dask supports several user interfaces:

-  High-Level
    -  :doc:`Arrays <array>`: parallel Numpy
    -  :doc:`Bags <bag>`: parallel lists
    -  :doc:`DataFrames <dataframe>`: parallel Pandas
    -  `Machine Learning <https://ml.dask.org>`_ : parallel Scikit-Learn
    -  Others from external projects, like `XArray <https://xarray.pydata.org>`_
-  Low-Level
    -  :doc:`Delayed <delayed>`: parallel function evaluation
    -  :doc:`Futures <futures>`: real-time parallel function evaluation

Each of these user interfaces employs the same underlying parallel computing
machinery, and so has the same scaling, diagnostics, resilience, and so on, but
each provides a different set of parallel algorithms and programming style.

This document helps you to decide which user interface best suits your needs,
and gives some general information that applies to all interfaces.
The pages linked above give more information about each interface in greater
depth.

High-Level Collections
----------------------

Many people who start using Dask are explicitly looking for a scalable version of
Numpy, Pandas, or Scikit-Learn.  For these situations, the starting point within
Dask is usually fairly clear.  If you want scalable Numpy arrays, then start with Dask
array; if you want scalable Pandas DataFrames, then start with Dask DataFrame, and so on.

These high-level interfaces copy the standard interface with slight variations.
These interfaces automatically parallelize over larger datasets for you for a
large subset of the API from the original project.

.. code-block:: python

   # Arrays
   import dask.array as da
   x = da.random.uniform(low=0, high=10, size=(10000, 10000),  # normal Numpy code
                         chunks=(1000, 1000))  # break into chunks of size 1000x1000

   y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

   # DataFrames
   import dask.dataframe as dd
   df = dd.read_csv('2018-*-*.csv', parse_dates='timestamp',  # normal Pandas code
                    blocksize=64000000)  # break text into 64MB chunks

   s = df.groupby('name').balance.mean()  # Use normal syntax for high level algorithms

   # Bags / lists
   import dask.bag as db
   b = db.read_text('*.json').map(json.loads)
   total = (b.filter(lambda d: d['name'] == 'Alice')
             .map(lambda d: d['balance'])
             .sum())

It is important to remember that, while APIs may be similar, some differences do
exist.  Additionally, the performance of some algorithms may differ from their
in-memory counterparts due to the advantages and disadvantages of parallel
programming.  Some thought and attention is still required when using Dask.


Low-Level Interfaces
--------------------

Often when parallelizing existing code bases or building custom algorithms, you
run into code that is parallelizable, but isn't just a big DataFrame or array.
Consider the for-loopy code below:

.. code-block:: python

   results = []
   for a in A:
       for b in B:
           if a < b:
               c = f(a, b)
           else:
               c = g(a, b)
           results.append(c)

There is potential parallelism in this code (the many calls to ``f`` and ``g``
can be done in parallel), but it's not clear how to rewrite it into a big
array or DataFrame so that it can use a higher-level API.  Even if you could
rewrite it into one of these paradigms, it's not clear that this would be a
good idea.  Much of the meaning would likely be lost in translation, and this
process would become much more difficult for more complex systems.

Instead, Dask's lower-level APIs let you write parallel code one function call
at a time within the context of your existing for loops.  A common solution
here is to use :doc:`Dask delayed <delayed>` to wrap individual function calls
into a lazily constructed task graph:

.. code-block:: python

   import dask

   lazy_results = []
   for a in A:
       for b in B:
           if a < b:
               c = dask.delayed(f)(a, b)  # add lazy task
           else:
               c = dask.delayed(g)(a, b)  # add lazy task
           lazy_results.append(c)

   results = dask.compute(*lazy_results)  # compute all in parallel


Combining High- and Low-Level Interfaces
----------------------------------------

It is common to combine high- and low-level interfaces.
For example, you might use Dask array/bag/dataframe to load in data and do
initial pre-processing, then switch to Dask delayed for a custom algorithm that
is specific to your domain, then switch back to Dask array/dataframe to clean
up and store results.  Understanding both sets of user interfaces, and how
to switch between them, can be a productive combination.

.. code-block:: python

   # Convert to a list of delayed Pandas dataframes
   delayed_values = df.to_delayed()

   # Manipulate delayed values arbitrarily as you like

   # Convert many delayed Pandas DataFrames back to a single Dask DataFrame
   df = dd.from_delayed(delayed_values)


Laziness and Computing
----------------------

Most Dask user interfaces are *lazy*, meaning that they do not evaluate until
you explicitly ask for a result using the ``compute`` method:

.. code-block:: python

   # This array syntax doesn't cause computation
   y = x + x.T - x.mean(axis=0)

   # Trigger computation by explicitly calling the compute method
   y = y.compute()

If you have multiple results that you want to compute at the same time, use the
``dask.compute`` function.  This can share intermediate results and so be more
efficient:

.. code-block:: python

   # compute multiple results at the same time with the compute function
   min, max = dask.compute(y.min(), y.max())

Note that the ``compute()`` function returns in-memory results.  It converts
Dask DataFrames to Pandas DataFrames, Dask arrays to Numpy arrays, and Dask
bags to lists.  *You should only call compute on results that will fit
comfortably in memory*.  If your result does not fit in memory, then you might
consider writing it to disk instead.

.. code-block:: python

   # Write larger results out to disk rather than store them in memory
   my_dask_dataframe.to_parquet('myfile.parquet')
   my_dask_array.to_hdf5('myfile.hdf5')
   my_dask_bag.to_textfiles('myfile.*.txt')


Persist into Distributed Memory
-------------------------------

Alternatively, if you are on a cluster, then you may want to trigger a
computation and store the results in distributed memory.  In this case you do
not want to call ``compute``, which would create a single Pandas, Numpy, or
list result. Instead, you want to call ``persist``, which returns a new Dask
object that points to actively computing, or already computed results spread
around your cluster's memory.

.. code-block:: python

   # Compute returns an in-memory non-Dask object
   y = y.compute()

   # Persist returns an in-memory Dask object that uses distributed storage if available
   y = y.persist()

This is common to see after data loading an preprocessing steps, but before
rapid iteration, exploration, or complex algorithms.  For example, we might read
in a lot of data, filter down to a more manageable subset, and then persist
data into memory so that we can iterate quickly.

.. code-block:: python

   import dask.dataframe as dd
   df = dd.read_parquet('...')
   df = df[df.name == 'Alice']  # select important subset of data
   df = df.persist()  # trigger computation in the background

   # These are all relatively fast now that the relevant data is in memory
   df.groupby(df.id).balance.sum().compute()   # explore data quickly
   df.groupby(df.id).balance.mean().compute()  # explore data quickly
   df.id.nunique()                             # explore data quickly


Lazy vs Immediate
-----------------

As mentioned above, most Dask workloads are lazy, that is, they don't start any
work until you explicitly trigger them with a call to ``compute()``.
However, sometimes you *do* want to submit work as quickly as possible, track it
over time, submit new work or cancel work depending on partial results, and so
on.  This can be useful when tracking or responding to real-time events,
handling streaming data, or when building complex and adaptive algorithms.

For these situations, people typically turn to the :doc:`futures interface
<futures>` which is a low-level interface like Dask delayed, but operates
immediately rather than lazily.

Here is the same example with Dask delayed and Dask futures to illustrate the
difference.

Delayed: Lazy
~~~~~~~~~~~~~

.. code-block:: python

   @dask.delayed
   def inc(x):
       return x + 1

   @dask.delayed
   def add(x, y):
       return x + y

   a = inc(1)       # no work has happened yet
   b = inc(2)       # no work has happened yet
   c = add(a, b)    # no work has happened yet

   c = c.compute()  # This triggers all of the above computations


Futures: Immediate
~~~~~~~~~~~~~~~~~~

.. code-block:: python

   from dask.distributed import Client
   client = Client()

   def inc(x):
       return x + 1

   def add(x, y):
       return x + y

   a = client.submit(inc, 1)     # work starts immediately
   b = client.submit(inc, 2)     # work starts immediately
   c = client.submit(add, a, b)  # work starts immediately

   c = c.result()                # block until work finishes, then gather result

You can also trigger work with the high-level collections using the
``persist`` function.  This will cause work to happen in the background when
using the distributed scheduler.


Combining Interfaces
--------------------

There are established ways to combine the interfaces above:

1.  The high-level interfaces (array, bag, dataframe) have a ``to_delayed``
    method that can convert to a sequence (or grid) of Dask delayed objects

    .. code-block:: python

       delayeds = df.to_delayed()

2.  The high-level interfaces (array, bag, dataframe) have a ``from_delayed``
    method that can convert from either Delayed *or* Future objects

    .. code-block:: python

       df = dd.from_delayed(delayeds)
       df = dd.from_delayed(futures)

3.  The ``Client.compute`` method converts Delayed objects into Futures

    .. code-block:: python

       futures = client.compute(delayeds)

4.  The ``dask.distributed.futures_of`` function gathers futures from
    persisted collections

    .. code-block:: python

       from dask.distributed import futures_of

       df = df.persist()  # start computation in the background
       futures = futures_of(df)

5.  The Dask.delayed object converts Futures into delayed objects

    .. code-block:: python

       delayed_value = dask.delayed(future)

The approaches above should suffice to convert any interface into any other.
We often see some anti-patterns that do not work as well:

1.  Calling low-level APIs (delayed or futures) on high-level objects (like
    Dask arrays or DataFrames). This downgrades those objects to their Numpy or
    Pandas equivalents, which may not be desired.
    Often people are looking for APIs like ``dask.array.map_blocks`` or
    ``dask.dataframe.map_partitions`` instead
2.  Calling ``compute()`` on Future objects.
    Often people want the ``.result()`` method instead
3.  Calling Numpy/Pandas functions on high-level Dask objects or
    high-level Dask functions on Numpy/Pandas objects

Conclusion
----------

Most people who use Dask start with only one of the interfaces above but
eventually learn how to use a few interfaces together.  This helps them
leverage the sophisticated algorithms in the high-level interfaces while also
working around tricky problems with the low-level interfaces.

For more information, see the documentation for the particular user interfaces
below:

-  High Level
    -  :doc:`Arrays <array>`: parallel Numpy
    -  :doc:`Bags <bag>`: parallel lists
    -  :doc:`DataFrames <dataframe>`: parallel Pandas
    -  `Machine Learning <https://ml.dask.org>`_ : parallel Scikit-Learn
    -  Others from external projects, like `XArray <https://xarray.pydata.org>`_
-  Low Level
    -  :doc:`Delayed <delayed>`: parallel function evaluation
    -  :doc:`Futures <futures>`: real-time parallel function evaluation