File: asynchronous.rst

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (124 lines) | stat: -rw-r--r-- 3,519 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
Asynchronous Operation
======================

Dask can run fully asynchronously and so interoperate with other highly
concurrent applications.  Internally Dask is built on top of Tornado coroutines
but also has a compatibility layer for asyncio (see below).

Basic Operation
---------------

When starting a client provide the ``asynchronous=True`` keyword to tell Dask
that you intend to use this client within an asynchronous context, such as a
function defined with ``async/await`` syntax.

.. code-block:: python

   async def f():
       client = await Client(asynchronous=True)

Operations that used to block now provide Tornado coroutines on which you can
``await``.

Fast functions that only submit work remain fast and don't need to be awaited.
This includes all functions that submit work to the cluster, like ``submit``,
``map``, ``compute``, and ``persist``.

.. code-block:: python

   future = client.submit(lambda x: x + 1, 10)

You can await futures directly

.. code-block:: python

   result = await future

   >>> print(result)
   11

Or you can use the normal client methods.  Any operation that waited until it
received information from the scheduler should now be ``await``'ed.

.. code-block:: python

   result = await client.gather(future)


If you want to use an asynchronous function with a synchronous ``Client``
(one made without the ``asynchronous=True`` keyword) then you can apply the
``asynchronous=True`` keyword at each method call and use the ``Client.sync``
function to run the asynchronous function:

.. code-block:: python

   from dask.distributed import Client

   client = Client()  # normal blocking client

   async def f():
       future = client.submit(lambda x: x + 1, 10)
       result = await client.gather(future, asynchronous=True)
       return result

   client.sync(f)


.. note: Blocking operations like the .compute() method aren’t ok to use in
         asynchronous mode. Instead you’ll have to use the Client.compute
         method


.. code-block:: python

    async with Client(asynchronous=True) as client:
        arr = da.random.random((1000, 1000), chunks=(1000, 100))
        await client.compute(arr.mean())


Example
-------

This self-contained example starts an asynchronous client, submits a trivial
job, waits on the result, and then shuts down the client. You can see
implementations for Asyncio and Tornado.

Python 3 with Tornado or Asyncio
++++++++++++++++++++++++++++++++

.. code-block:: python

   from dask.distributed import Client

   async def f():
       client = await Client(asynchronous=True)
       future = client.submit(lambda x: x + 1, 10)
       result = await future
       await client.close()
       return result

   # Either use Tornado
   from tornado.ioloop import IOLoop
   IOLoop().run_sync(f)

   # Or use asyncio
   import asyncio
   asyncio.get_event_loop().run_until_complete(f())


Use Cases
---------

Historically this has been used in a few kinds of applications:

1.  To integrate Dask into other asynchronous services (such as web backends),
    supplying a computational engine similar to Celery, but while still
    maintaining a high degree of concurrency and not blocking needlessly.

2.  For computations that change or update state very rapidly, such as is
    common in some advanced machine learning workloads.

3.  To develop the internals of Dask's distributed infrastructure, which is
    written entirely in this style.

4.  For complex control and data structures in advanced applications.