1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
Diagnosing Performance
======================
Understanding the performance of a distributed computation can be difficult.
This is due in part to the many components of a distributed computer that may
impact performance:
1. Compute time
2. Memory bandwidth
3. Network bandwidth
4. Disk bandwidth
5. Scheduler overhead
6. Serialization costs
This difficulty is compounded because the information about these costs is
spread among many machines and so there is no central place to collect data to
identify performance issues.
Fortunately, Dask collects a variety of diagnostic information during
execution. It does this both to provide performance feedback to users, but
also for its own internal scheduling decisions. The primary place to observe
this feedback is the diagnostic dashboard. This document describes the various
pieces of performance information available and how to access them.
Task start and stop times
-------------------------
Workers capture durations associated to tasks. For each task that passes
through a worker we record start and stop times for each of the following:
1. Serialization (gray)
2. Dependency gathering from peers (red)
3. Disk I/O to collect local data (orange)
4. Execution times (colored by task)
The main way to observe these times is with the task stream plot on the
scheduler's ``/status`` page where the colors of the bars correspond to the
colors listed above.
.. image:: ../../debian/bokeh-task-stream.gif
:alt: Dask task stream
:width: 50%
Alternatively if you want to do your own diagnostics on every task event you
might want to create a :doc:`Scheduler plugin <plugins>`. All of this
information will be available when a task transitions from processing to
memory or erred.
Statistical Profiling
---------------------
For single-threaded profiling Python users typically depend on the CProfile
module in the standard library (Dask developers recommend the `snakeviz
<https://jiffyclub.github.io/snakeviz/>`_ tool for single-threaded profiling).
Unfortunately the standard CProfile module does not work with multi-threaded or
distributed computations.
To address this Dask implements its own distributed `statistical profiler
<https://en.wikipedia.org/wiki/Profiling_(computer_programming)#Statistical_profilers>`_.
Every 10ms each worker process checks what each of its worker threads are
doing. It captures the call stack and adds this stack to a counting data
structure. This counting data structure is recorded and cleared every second
in order to establish a record of performance over time.
Users typically observe this data through the ``/profile`` plot on either the
worker or scheduler diagnostic dashboards. On the scheduler page they observe
the total profile aggregated over all workers over all threads. Clicking on
any of the bars in the profile will zoom the user into just that section, as is
typical with most profiling tools. There is a timeline at the bottom of the
page to allow users to select different periods in time.
.. image:: ../../debian/daskboard-profile.gif
:alt: Dask profiler
:width: 70%
Profiles are also grouped by the task that was being run at the time. You can
select a task name from the selection menu at the top of the page. You can
also click on the rectangle corresponding to the task in the main task stream
plot on the ``/status`` page.
Users can also query this data directly using the :doc:`Client.profile <api>`
function. This will deliver the raw data structure used to produce these
plots. They can also pass a filename to save the plot as an HTML file
directly. Note that this file will have to be served from a webserver like
``python -m http.server`` to be visible.
The 10ms and 1s parameters can be controlled by the ``profile-interval`` and
``profile-cycle-interval`` entries in the config.yaml file.
Bandwidth
---------
Dask workers track every incoming and outgoing transfer in the
``Worker.transfer_outgoing_log`` and ``Worker.transfer_incoming_log``
attributes including
1. Total bytes transferred
2. Compressed bytes transferred
3. Start/stop times
4. Keys moved
5. Peer
These are made available to users through the ``/status`` page of the Worker's
diagnostic dashboard. You can capture their state explicitly by running a
command on the workers:
.. code-block:: python
client.run(lambda dask_worker: dask_worker.transfer_outgoing_log)
client.run(lambda dask_worker: dask_worker.transfer_incoming_log)
Performance Reports
-------------------
Often when benchmarking and/or profiling, users may want to record a
particular computation or even a full workflow. Dask can save the bokeh
dashboards as static HTML plots including the task stream, worker profiles,
bandwidths, etc. This is done wrapping a computation with the
:class:`distributed.performance_report` context manager:
.. code-block:: python
from dask.distributed import performance_report
with performance_report(filename="dask-report.html"):
## some dask computation
The following video demonstrates the ``performance_report`` context manager in greater
detail:
.. raw:: html
<iframe width="560"
height="315"
src="https://www.youtube-nocookie.com/embed/nTMGbkS761Q"
frameborder="0"
allow="autoplay; encrypted-media"
allowfullscreen>
</iframe>
A note about times
------------------
Different computers maintain different clocks which may not match perfectly.
To address this the Dask scheduler sends its current time in response to every
worker heartbeat. Workers compare their local time against this time to obtain
an estimate of differences. All times recorded in workers take this estimated
delay into account. This helps, but still, imprecise measurements may exist.
All times are intended to be from the scheduler's perspective.
Analysing memory usage over time
--------------------------------
You may want to know how the cluster-wide memory usage evolves over time as a
computation progresses, or how two different implementations of the same algorithm
compare memory-wise.
This is done wrapping a computation with the
:class:`distributed.diagnostics.MemorySampler` context manager:
.. code-block:: python
from distributed import Client
from distributed.diagnostics import MemorySampler
client = Client(...)
ms = MemorySampler()
with ms.sample("collection 1"):
collection1.compute()
with ms.sample("collection 2"):
collection2.compute()
...
ms.plot(align=True)
Sample output:
.. image:: images/memory-sampler.svg
:alt: Sample output of the MemorySampler
.. autoclass:: distributed.diagnostics.MemorySampler
:members:
|