File: client.rst

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (206 lines) | stat: -rw-r--r-- 7,084 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
Client
========

The Client is the primary entry point for users of ``dask.distributed``.

After we `setup a cluster <https://docs.dask.org/en/latest/setup.html>`_, we initialize a ``Client`` by pointing
it to the address of a ``Scheduler``:

.. code-block:: python

   >>> from distributed import Client
   >>> client = Client('127.0.0.1:8786')

There are a few different ways to interact with the cluster through the client:

1.  The Client satisfies most of the standard concurrent.futures_ - PEP-3148_
    interface with ``.submit``, ``.map`` functions and ``Future`` objects,
    allowing the immediate and direct submission of tasks.
2.  The Client registers itself as the default Dask_ scheduler, and so runs all
    dask collections like dask.array_, dask.bag_, dask.dataframe_ and dask.delayed_
3.  The Client has additional methods for manipulating data remotely.  See the
    full :doc:`API <api>` for a thorough list.


Concurrent.futures
------------------

We can submit individual function calls with the ``client.submit`` method or
many function calls with the ``client.map`` method

.. code-block:: python

   >>> def inc(x):
           return x + 1

   >>> x = client.submit(inc, 10)
   >>> x
   <Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>

   >>> L = client.map(inc, range(1000))
   >>> L
   [<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>,
    <Future - key: inc-...>,
    <Future - key: inc-...>,
    <Future - key: inc-...>, ...]

These results live on distributed workers.

We can submit tasks on futures.  The function will go to the machine where the
futures are stored and run on the result once it has completed.

.. code-block:: python

   >>> y = client.submit(inc, x)      # Submit on x, a Future
   >>> total = client.submit(sum, L)  # Map on L, a list of Futures

We gather back the results using either the ``Future.result`` method for single
futures or ``client.gather`` method for many futures at once.

.. code-block:: python

   >>> x.result()
   11

   >>> client.gather(L)
   [1, 2, 3, 4, 5, ...]

But, as always, we want to minimize communicating results back to the local
process.  It's often best to leave data on the cluster and operate on it
remotely with functions like ``submit``, ``map``, ``get`` and ``compute``.
See :doc:`efficiency <efficiency>` for more information on efficient use of
distributed.


Dask
----

The parent library Dask_ contains objects like dask.array_, dask.dataframe_,
dask.bag_, and dask.delayed_, which automatically produce parallel algorithms
on larger datasets.  All dask collections work smoothly with the distributed
scheduler.

When we create a ``Client`` object it registers itself as the default Dask
scheduler.  All ``.compute()`` methods will automatically start using the
distributed system.

.. code-block:: python

   client = Client('scheduler:8786')

   my_dataframe.sum().compute()  # Now uses the distributed system by default

We can stop this behavior by using the ``set_as_default=False`` keyword
argument when starting the Client.

Dask's normal ``.compute()`` methods are *synchronous*, meaning that they block
the interpreter until they complete.  Dask.distributed allows the new ability
of *asynchronous* computing, we can trigger computations to occur in the
background and persist in memory while we continue doing other work.  This is
typically handled with the ``Client.persist`` and ``Client.compute`` methods
which are used for larger and smaller result sets respectively.

.. code-block:: python

   >>> df = client.persist(df)  # trigger all computations, keep df in memory
   >>> type(df)
   dask.DataFrame

For more information see the page on :doc:`Managing Computation <manage-computation>`.

.. _pure functions:

Pure Functions by Default
-------------------------

By default, ``distributed`` assumes that all functions are pure_. Pure functions:

* always return the same output for a given set of inputs
* do not have side effects, like modifying global state or creating files

If this is not the case, you should use the ``pure=False`` keyword argument in methods like ``Client.map()`` and ``Client.submit()``.

The client associates a key to all computations.  This key is accessible on
the Future object.

.. code-block:: python

   >>> from operator import add
   >>> x = client.submit(add, 1, 2)
   >>> x.key
   'add-ebf39f96ad7174656f97097d658f3fa2'

This key should be the same across all computations with the same inputs and
across all machines.  If we run the computation above on any computer with the
same environment then we should get the exact same key.

The scheduler avoids redundant computations.  If the result is already in
memory from a previous call then that old result will be used rather than
recomputing it.  Calls to submit or map are idempotent in the common case.

While convenient, this feature may be undesired for impure functions, like
``random``.  In these cases two calls to the same function with the same inputs
should produce different results.  We accomplish this with the ``pure=False``
keyword argument.  In this case keys are randomly generated (by ``uuid4``.)

.. code-block:: python

   >>> import numpy as np
   >>> client.submit(np.random.random, 1000, pure=False).key
   'random_sample-fc814a39-ee00-42f3-8b6f-cac65bcb5556'
   >>> client.submit(np.random.random, 1000, pure=False).key
   'random_sample-a24e7220-a113-47f2-a030-72209439f093'

.. _pure: https://toolz.readthedocs.io/en/latest/purity.html


Async/await Operation
---------------------

If we are operating in an asynchronous environment then the blocking functions
listed above become asynchronous equivalents.  You must start your client
with the ``asynchronous=True`` keyword and ``yield`` or ``await`` blocking
functions.

.. code-block:: python

   async def f():
       client = await Client(asynchronous=True)
       future = client.submit(func, *args)
       result = await future
       return result

If you want to reuse the same client in asynchronous and synchronous
environments you can apply the ``asynchronous=True`` keyword at each method
call.

.. code-block:: python

   client = Client()  # normal blocking client

   async def f():
       futures = client.map(func, L)
       results = await client.gather(futures, asynchronous=True)
       return results

See the :doc:`Asynchronous <asynchronous>` documentation for more information.


Additional Links
----------------

For more information on how to use dask.distributed you may want to look at the
following pages:

*  :doc:`Managing Memory <memory>`
*  :doc:`Managing Computation <manage-computation>`
*  :doc:`Data Locality <locality>`
*  :doc:`API <api>`

.. _concurrent.futures:  https://docs.python.org/3/library/concurrent.futures.html
.. _PEP-3148: https://www.python.org/dev/peps/pep-3148/
.. _dask.array: https://docs.dask.org/en/latest/array.html
.. _dask.bag: https://docs.dask.org/en/latest/bag.html
.. _dask.dataframe: https://docs.dask.org/en/latest/dataframe.html
.. _dask.delayed: https://docs.dask.org/en/latest/delayed.html
.. _Dask: https://dask.org