File: diagnostics-local.rst

package info (click to toggle)
dask 1.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 6,856 kB
  • sloc: python: 51,266; sh: 178; makefile: 142
file content (328 lines) | stat: -rw-r--r-- 10,116 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
Diagnostics (local)
====================

Profiling parallel code can be challening, but ``dask.diagnostics`` provides
functionality to aid in profiling and inspecting execution with the
:doc:`local task scheduler <scheduling>`.

This page describes the following few built-in options:

1.  ProgressBar
2.  Profiler
3.  ResourceProfiler
4.  CacheProfiler


This page then provides instructions on how to build your own custom diagnostic.

.. currentmodule:: dask.diagnostics


Progress Bar
------------

.. autosummary::
   ProgressBar

The ``ProgressBar`` class builds on the scheduler callbacks described above to
display a progress bar in the terminal or notebook during computation. This can
be a nice feedback during long running graph execution. It can be used as a
context manager around calls to ``get`` or ``compute`` to profile the
computation:

.. code-block:: python

    >>> from dask.diagnostics import ProgressBar
    >>> a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
    >>> res = a.dot(a.T).mean(axis=0)

    >>> with ProgressBar():
    ...     out = res.compute()
    [########################################] | 100% Completed | 17.1 s

Or registered globally using the ``register`` method.

.. code-block:: python

    >>> pbar = ProgressBar()
    >>> pbar.register()
    >>> out = res.compute()
    [########################################] | 100% Completed | 17.1 s

To unregister from the global callbacks, call the ``unregister`` method:

.. code-block:: python

    >>> pbar.unregister()



Profiler
--------

.. autosummary::
   Profiler

Dask provides a few tools for profiling execution. As with the ``ProgressBar``,
they each can be used as context managers, or registered globally.

The ``Profiler`` class is used to profile dask execution at the task level.
During execution it records the following information for each task:

1. Key
2. Task
3. Start time in seconds since the epoch
4. Finish time in seconds since the epoch
5. Worker id


ResourceProfiler
----------------

.. autosummary::
   ResourceProfiler

The ``ResourceProfiler`` class is used to profile dask execution at the
resource level. During execution it records the following information
for each timestep

1. Time in seconds since the epoch
2. Memory usage in MB
3. % CPU usage

The default timestep is 1 second, but can be set manually using the ``dt``
keyword.

.. code-block:: python

    >>> from dask.diagnostics import ResourceProfiler
    >>> rprof = ResourceProfiler(dt=0.5)


CacheProfiler
-------------

.. autosummary::
   CacheProfiler

The ``CacheProfiler`` class is used to profile dask execution at the scheduler
cache level. During execution it records the following information for each
task:

1. Key
2. Task
3. Size metric
4. Cache entry time in seconds since the epoch
5. Cache exit time in seconds since the epoch

Where the size metric is the output of a function called on the result of each
task. The default metric is to count each task (``metric`` is 1 for all tasks).
Other functions may be used as a metric instead through the ``metric`` keyword.
For example, the ``nbytes`` function found in ``cachey`` can be used to measure
the number of bytes in the scheduler cache:

.. code-block:: python

    >>> from dask.diagnostics import CacheProfiler
    >>> from cachey import nbytes
    >>> cprof = CacheProfiler(metric=nbytes)


Example
-------

As an example to demonstrate using the diagnostics, we'll profile some linear
algebra done with ``dask.array``. We'll create a random array, take its QR
decomposition, and then reconstruct the initial array by multiplying the Q and
R components together. Note that since the profilers (and all diagnostics) are
just context managers, multiple profilers can be used in a with block:

.. code-block:: python

    >>> import dask.array as da
    >>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
    >>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
    >>> q, r = da.linalg.qr(a)
    >>> a2 = q.dot(r)

    >>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
    ...         CacheProfiler() as cprof:
    ...     out = a2.compute()


The results of each profiler are stored in their ``results`` attribute as a
list of ``namedtuple`` objects:

.. code-block:: python

    >>> prof.results[0]
    TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
             task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
             start_time=1454368444.493292,
             end_time=1454368444.902987,
             worker_id=4466937856)

    >>> rprof.results[0]
    ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)

    >>> cprof.results[0]
    CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
              task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
              metric=1,
              cache_time=1454368444.49662,
              free_time=1454368446.769452)

These can be analyzed separately, or viewed in a bokeh plot using the provided
``visualize`` method on each profiler:

.. code-block:: python

    >>> prof.visualize()


.. raw:: html

    <p>Example disable for privacy protection</p>

To view multiple profilers at the same time, the ``dask.diagnostics.visualize``
function can be used. This takes a list of profilers, and creates a vertical
stack of plots aligned along the x-axis:

.. code-block:: python

    >>> from dask.diagnostics import visualize
    >>> visualize([prof, rprof, cprof])


.. raw:: html

    <p>Example disable for privacy protection</p>


Looking at the above figure, from top to bottom:

1. The results from the ``Profiler`` object. This shows the execution time for
   each task as a rectangle, organized along the y-axis by worker (in this case
   threads). Similar tasks are grouped by color, and by hovering over each task
   one can see the key and task that each block represents.

2. The results from the ``ResourceProfiler`` object. This shows two lines, one
   for total CPU percentage used by all the workers, and one for total memory
   usage.

3. The results from the ``CacheProfiler`` object. This shows a line for each
   task group, plotting the sum of the current ``metric`` in the cache against
   time. In this case it's the default metric (count), and the lines represent
   the number of each object in the cache at time. Note that the grouping and
   coloring is the same as for the ``Profiler`` plot, and that the task
   represented by each line can be found by hovering over the line.

From these plots we can see that the initial tasks (calls to
``numpy.random.random`` and ``numpy.linalg.qr`` for each chunk) are run
concurrently, but only use slightly more than 100\% CPU. This is because the
call to ``numpy.linalg.qr`` currently doesn't release the global interpreter
lock, so those calls can't truly be done in parallel. Next, there's a reduction
step where all the blocks are combined. This requires all the results from the
first step to be held in memory, as shown by the increased number of results in
the cache, and increase in memory usage. Immediately after this task ends, the
number of elements in the cache decreases, showing that they were only needed
for this step. Finally, there's an interleaved set of calls to ``dot`` and
``sum``. Looking at the CPU plot shows that these run both concurrently and in
parallel, as the CPU percentage spikes up to around 350\%.


Custom Callbacks
----------------

.. autosummary:: Callback

Schedulers based on ``dask.local.get_async`` (currently
``dask.get``, ``dask.threaded.get``, and ``dask.multiprocessing.get``)
accept five callbacks, allowing for inspection of scheduler execution.

The callbacks are:

1. ``start(dsk)``

   Run at the beginning of execution, right before the state is initialized.
   Receives the dask graph.

2. ``start_state(dsk, state)``

   Run at the beginning of execution, right after the state is initialized.
   Receives the dask graph and scheduler state.

3. ``pretask(key, dsk, state)``

   Run every time a new task is started. Receives the key of the task to be
   run, the dask graph, and the scheduler state.

4. ``posttask(key, result, dsk, state, id)``

   Run every time a task is finished. Receives the key of the task that just
   completed, the result, the dask graph, the scheduler state, and the id of
   the worker that ran the task.

5. ``finish(dsk, state, errored)``

   Run at the end of execution, right before the result is returned. Receives
   the dask graph, the scheduler state, and a boolean indicating whether or not
   the exit was due to an error.

Custom diagnostics can be created either by instantiating the ``Callback``
class with the some of above methods as keywords or by subclassing the
``Callback`` class.
Here we create a class that prints the name of every key as it's computed:

.. code-block:: python

    from dask.callbacks import Callback
    class PrintKeys(Callback):
        def _pretask(self, key, dask, state):
            """Print the key of every task as it's started"""
            print("Computing: {0}!".format(repr(key)))

This can now be used as a context manager during computation:

.. code-block:: python

    >>> from operator import add, mul
    >>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}

    >>> with PrintKeys():
    ...     get(dsk, 'c')
    Computing 'a'!
    Computing 'b'!
    Computing 'c'!

Alternatively, functions may be passed in as keyword arguments to ``Callback``:

.. code-block:: python

    >>> def printkeys(key, dask, state):
    ...    print("Computing: {0}!".format(repr(key)))

    >>> with Callback(pretask=printkeys):
    ...     get(dsk, 'c')
    Computing 'a'!
    Computing 'b'!
    Computing 'c'!


API
---

.. autosummary::
   CacheProfiler
   Callback
   Profiler
   ProgressBar
   ResourceProfiler
   visualize

.. autofunction:: ProgressBar
.. autofunction:: Profiler
.. autofunction:: ResourceProfiler
.. autofunction:: CacheProfiler
.. autofunction:: Callback
.. autofunction:: visualize