File: deploying-python-advanced.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (212 lines) | stat: -rw-r--r-- 5,823 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
Python API (advanced)
=====================

.. currentmodule:: distributed

In some rare cases, experts may want to create ``Scheduler``, ``Worker``, and
``Nanny``  objects explicitly in Python.  This is often necessary when making
tools to automatically deploy Dask in custom settings.

It is more common to create a :doc:`Local cluster with Client() on a single
machine <deploying-python>` or use the :doc:`Command Line Interface (CLI) <deploying-cli>`.
New readers are recommended to start there.

If you do want to start Scheduler and Worker objects yourself you should be a
little familiar with ``async``/``await`` style Python syntax.  These objects
are awaitable and are commonly used within ``async with`` context managers.
Here are a few examples to show a few ways to start and finish things.

Full Example
------------

.. autosummary::
   Scheduler
   Worker
   Client

We first start with a comprehensive example of setting up a Scheduler, two Workers,
and one Client in the same event loop, running a simple computation, and then
cleaning everything up.

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker, Client

   async def f():
       async with Scheduler() as s:
           async with Worker(s.address) as w1, Worker(s.address) as w2:
               async with Client(s.address, asynchronous=True) as client:
                   future = client.submit(lambda x: x + 1, 10)
                   result = await future
                   print(result)

   asyncio.get_event_loop().run_until_complete(f())

Now we look at simpler examples that build up to this case.

Scheduler
---------

.. autosummary::
   Scheduler

We create scheduler by creating a ``Scheduler()`` object, and then ``await``
that object to wait for it to start up.  We can then wait on the ``.finished``
method to wait until it closes.  In the meantime the scheduler will be active
managing the cluster..

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker

   async def f():
       s = Scheduler()        # scheduler created, but not yet running
       s = await s            # the scheduler is running
       await s.finished()     # wait until the scheduler closes

   asyncio.get_event_loop().run_until_complete(f())

This program will run forever, or until some external process connects to the
scheduler and tells it to stop.  If you want to close things yourself you can
close any ``Scheduler``, ``Worker``, ``Nanny``, or ``Client`` class by awaiting
the ``.close`` method:

.. code-block:: python

   await s.close()


Worker
------

.. autosummary::
   Worker

The worker follows the same API.
The only difference is that the worker needs to know the address of the
scheduler.

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker

   async def f(scheduler_address):
       w = await Worker(scheduler_address)
       await w.finished()

   asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))


Start many in one event loop
----------------------------

.. autosummary::
   Scheduler
   Worker

We can run as many of these objects as we like in the same event loop.

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker

   async def f():
       s = await Scheduler()
       w = await Worker(s.address)
       await w.finished()
       await s.finished()

   asyncio.get_event_loop().run_until_complete(f())


Use Context Managers
--------------------

We can also use ``async with`` context managers to make sure that we clean up
properly.  Here is the same example as from above:

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker

   async def f():
       async with Scheduler() as s:
           async with Worker(s.address) as w:
               await w.finished()
               await s.finished()

   asyncio.get_event_loop().run_until_complete(f())

Alternatively, in the example below we also include a ``Client``, run a small
computation, and then allow things to clean up after that computation..

.. code-block:: python

   import asyncio
   from dask.distributed import Scheduler, Worker, Client

   async def f():
       async with Scheduler() as s:
           async with Worker(s.address) as w1, Worker(s.address) as w2:
               async with Client(s.address, asynchronous=True) as client:
                   future = client.submit(lambda x: x + 1, 10)
                   result = await future
                   print(result)

   asyncio.get_event_loop().run_until_complete(f())

This is equivalent to creating and ``awaiting`` each server, and then calling
``.close`` on each as we leave the context.
In this example we don't wait on ``s.finished()``, so this will terminate
relatively quickly.  You could have called ``await s.finished()`` though if you
wanted this to run forever.

.. _nanny:

Nanny
-----

.. autosummary::
   Nanny

Alternatively, we can replace ``Worker`` with ``Nanny`` if we want your workers
to be managed in a separate process.  The ``Nanny`` constructor follows the
same API. This allows workers to restart themselves in case of failure. Also,
it provides some additional monitoring, and is useful when coordinating many
workers that should live in different processes in order to avoid the GIL_.

.. code-block:: python

   # w = await Worker(s.address)
   w = await Nanny(s.address)

.. _GIL: https://docs.python.org/3/glossary.html#term-gil


API
---

These classes have a variety of keyword arguments that you can use to control
their behavior.  See the API documentation below for more information.

Scheduler
~~~~~~~~~
.. autoclass:: Scheduler
   :members:

Worker
~~~~~~

.. autoclass:: Worker
   :members:

Nanny
~~~~~

.. autoclass:: Nanny
   :members: