File: array-creation.rst

package info (click to toggle)
dask 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 6,856 kB
  • sloc: python: 51,266; sh: 178; makefile: 142
file content (351 lines) | stat: -rw-r--r-- 9,514 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
Create Dask Arrays
==================

You can load or store Dask arrays from a variety of common sources like HDF5,
NetCDF, `Zarr <http://zarr.readthedocs.io/en/stable/>`_, or any format that
supports NumPy-style slicing.

.. currentmodule:: dask.array

.. autosummary::
   from_array
   from_delayed
   from_npy_stack
   from_zarr
   stack
   concatenate

NumPy Slicing
-------------

.. autosummary::
   from_array

Many storage formats have Python projects that expose storage using NumPy
slicing syntax.  These include HDF5, NetCDF, BColz, Zarr, GRIB, etc.  For
example, we can load a Dask array from an HDF5 file using `h5py <http://www.h5py.org/>`_:

.. code-block:: Python

   >>> import h5py
   >>> f = h5py.File('myfile.hdf5') # HDF5 file
   >>> d = f['/data/path']          # Pointer on on-disk array
   >>> d.shape                      # d can be very large
   (1000000, 1000000)

   >>> x = d[:5, :5]                # We slice to get numpy arrays

Given an object like ``d`` above that has ``dtype`` and ``shape`` properties
and that supports NumPy style slicing, we can construct a lazy Dask array:

.. code-block:: Python

   >>> import dask.array as da
   >>> x = da.from_array(d, chunks=(1000, 1000))

This process is entirely lazy.  Neither creating the h5py object nor wrapping
it with ``da.from_array`` have loaded any data.


Concatenation and Stacking
--------------------------

.. autosummary::
   stack
   concatenate

Often we store data in several different locations and want to stitch them together:

.. code-block:: Python

    dask_arrays = []
    for fn in filenames:
        f = h5py.File(fn)
        d = f['/data']
        array = da.from_array(d, chunks=(1000, 1000))
        dask_arrays.append(array)

    x = da.concatenate(dask_arrays, axis=0)  # concatenate arrays along first axis

For more information, see :doc:`concatenation and stacking <array-stack>` docs.


Using ``dask.delayed``
----------------------

.. autosummary::
   from_delayed
   stack
   concatenate

Sometimes NumPy-style data resides in formats that do not support NumPy-style
slicing.  We can still construct Dask arrays around this data if we have a
Python function that can generate pieces of the full array if we use
:doc:`dask.delayed <delayed>`.  Dask delayed lets us delay a single function
call that would create a NumPy array.  We can then wrap this delayed object
with ``da.from_delayed``, providing a dtype and shape to produce a
single-chunked Dask array.  Furthermore, we can use ``stack`` or ``concatenate`` from
before to construct a larger lazy array.

As an example, consider loading a stack of images using ``skimage.io.imread``:

.. code-block:: python

    import skimage.io
    import dask.array as da
    import dask

    imread = dask.delayed(skimage.io.imread, pure=True)  # Lazy version of imread

    filenames = sorted(glob.glob('*.jpg'))

    lazy_images = [imread(path) for path in filenames]   # Lazily evaluate imread on each path
    sample = lazy_images[0].compute()  # load the first image (assume rest are same shape/dtype)

    arrays = [da.from_delayed(lazy_image,           # Construct a small Dask array
                              dtype=sample.dtype,   # for every lazy value
                              shape=sample.shape)
              for lazy_image in lazy_images]

    stack = da.stack(arrays, axis=0)                # Stack all small Dask arrays into one

See :doc:`documentation on using dask.delayed with collections<delayed-collections>`.


From Dask DataFrame
-------------------

You can create Dask arrays from Dask DataFrames using the ``.values`` attribute
or the ``.to_records()`` method:

.. code-block:: python

   >>> x = df.values
   >>> x = df.to_records()

However, these arrays do not have known chunk sizes (dask.dataframe does not
track the number of rows in each partition), and so some operations like slicing
will not operate correctly.

If you have a function that converts a Pandas DataFrame into a NumPy array,
then calling ``map_partitions`` with that function on a Dask DataFrame will
produce a Dask array:

.. code-block:: python

   >>> x = df.map_partitions(np.asarray)


Interactions with NumPy arrays
------------------------------

Dask array operations will automatically convert NumPy arrays into single-chunk
dask arrays:

.. code-block:: python

   >>> x = da.sum(np.ones(5))
   >>> x.compute()
   5

When NumPy and Dask arrays interact, the result will be a Dask array.  Automatic
rechunking rules will generally slice the NumPy array into the appropriate Dask
chunk shape:

.. code-block:: python

   >>> x = da.ones(10, chunks=(5,))
   >>> y = np.ones(10)
   >>> z = x + y
   >>> z
   dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>

These interactions work not just for NumPy arrays but for any object that has
shape and dtype attributes and implements NumPy slicing syntax.


Chunks
------

See :doc:`documentation on Array Chunks <array-chunks>` for more information.


Store Dask Arrays
=================

.. autosummary::
   store
   to_hdf5
   to_npy_stack
   to_zarr
   compute

In Memory
---------

.. autosummary::
   compute

If you have a small amount of data, you can call ``np.array`` or ``.compute()``
on your Dask array to turn in to a normal NumPy array:

.. code-block:: Python

   >>> x = da.arange(6, chunks=3)
   >>> y = x**2
   >>> np.array(y)
   array([0, 1, 4, 9, 16, 25])

   >>> y.compute()
   array([0, 1, 4, 9, 16, 25])


NumPy style slicing
-------------------

.. autosummary::
   store

You can store Dask arrays in any object that supports NumPy-style slice
assignment like ``h5py.Dataset``:

.. code-block:: Python

   >>> import h5py
   >>> f = h5py.File('myfile.hdf5')
   >>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
   >>> da.store(x, d)

Also, you can store several arrays in one computation by passing lists of sources and
destinations:

.. code-block:: Python

   >>> da.store([array1, array2], [output1, output2])  # doctest: +SKIP

HDF5
----

.. autosummary::
   to_hdf5

HDF5 is sufficiently common that there is a special function ``to_hdf5`` to
store data into HDF5 files using ``h5py``:

.. code-block:: Python

   >>> da.to_hdf5('myfile.hdf5', '/y', y)  # doctest: +SKIP

You can store several arrays in one computation with the function
``da.to_hdf5`` by passing in a dictionary:

.. code-block:: Python

   >>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y})  # doctest: +SKIP


Zarr
----

The `Zarr <https://zarr.readthedocs.io>`_ format is a chunk-wise binary array
storage file format with a good selection of encoding and compression options.
Due to each chunk being stored in a separate file, it is ideal for parallel
access in both reading and writing (for the latter, if the Dask array
chunks are alligned with the target). Furthermore, storage in
:doc:`remote data services <remote-data-services>` such as S3 and GCS is
supported.

For example, to save data to a local zarr dataset you would do:

.. code-block:: Python

   >>> arr.to_zarr('output.zarr')

or to save to a particular bucket on S3:

.. code-block:: Python

   >>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
                   'secret': 'mysecret'})

or your own custom zarr Array:

.. code-block:: Python

   >>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
   >>> arr.to_zarr(z)

To retrieve those data, you would do ``da.read_zarr`` with exactly the same arguments. The
chunking of the resultant Dask array is defined by how the files were saved, unless
otherwise specified.


Plugins
=======

We can run arbitrary user-defined functions on Dask arrays whenever they are
constructed.  This allows us to build a variety of custom behaviors that improve
debugging, user warning, etc.  You can register a list of functions to run on
all Dask arrays to the global ``array_plugins=`` value:

.. code-block:: python

   >>> def f(x):
   ...     print(x.nbytes)

   >>> with dask.config.set(array_plugins=[f]):
   ...     x = da.ones((10, 1), chunks=(5, 1))
   ...     y = x.dot(x.T)
   80
   80
   800
   800

If the plugin function returns None, then the input Dask array will be returned
without change.  If the plugin function returns something else, then that value
will be the result of the constructor.

Examples
--------

Automatically compute
~~~~~~~~~~~~~~~~~~~~~

We may wish to turn some Dask array code into normal NumPy code.  This is
useful, for example, to track down errors immediately that would otherwise be
hidden by Dask's lazy semantics:

.. code-block:: python

   >>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
   ...     x = da.arange(5, chunks=2)

   >>> x  # this was automatically converted into a numpy array
   array([0, 1, 2, 3, 4])

Warn on large chunks
~~~~~~~~~~~~~~~~~~~~

We may wish to warn users if they are creating chunks that are too large:

.. code-block:: python

   def warn_on_large_chunks(x):
       shapes = list(itertools.product(*x.chunks))
       nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
       if any(nb > 1e9 for nb in nbytes):
           warnings.warn("Array contains very large chunks")

   with dask.config.set(array_plugins=[warn_on_large_chunks]):
       ...

Combine
~~~~~~~

You can also combine these plugins into a list.  They will run one after the
other, chaining results through them:

.. code-block:: python

   with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
       ...