File: fine-performance-metrics.rst

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (213 lines) | stat: -rw-r--r-- 9,303 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
Fine Performance Metrics
========================
.. note::

    This is an experimental feature and may rapidly change without a deprecation cycle.

You may want to investigate where your Dask workload spends the majority of its time;
not only on which tasks, but also *doing what* while running said tasks.
Dask automatically collects *fine performance metrics* to answer this question by
breaking down the end-to-end runtime of a computation by task and, within each task, by
a series of *activities* taken to complete it.

In order to observe these metrics, you can simply

#. run your workload end-to-end
#. open the Dask dashboard (default for a LocalCluster: `<http://localhost:8787>`_)
#. select ``More...`` -> ``Fine Performance Metrics``

Alternatively, if you're using Jupyter Lab and `dask-labextension
<https://github.com/dask/dask-labextension>`_, you can just drag the
``Fine Performance Metrics`` widget onto your Jupyter dashboard.

.. image:: images/fine-performance-metrics/seconds-full.png
   :alt: Populated Fine Performance Metrics dashboard

The central panel (**Task execution, by activity**) shows what *activities* the cluster
spent its time on, cumulatively for all currently-visible functions. The most important
ones are:

``thread-cpu``
  CPU time spent by tasks while running on workers. This is typically "good" time; in
  other words it's the same time you would have spent if you ran the workload serially
  on a single CPU - but parallelized over however how many CPUs are available on
  your cluster.
``thread-noncpu``
  Difference between wall clock time and CPU time spent by tasks while running on
  workers. This is typically I/O time, GPU time, CPU contention, or GIL contention.
  If you observe large amounts of this in your overall workload, you probably want to
  break it down by function and isolate those that are known to perform I/O or GPU
  activity.
``idle``
  Time where a worker had a free thread, but nothing to run on it. This is typically
  caused by the workload not being able to fully utilize all threads on the cluster,
  network latency between scheduler and workers, or excessive CPU load on the scheduler.
  This measure does not include time spent while the whole cluster was completely idle.
``disk-read``, ``disk-write``, ``compress``, ``decompress``
  Time spent spilling/unspilling to disk due to not having enough memory available.
  See :doc:`worker-memory`.
``executor``, ``offload``, ``other``
  This is overhead from the Dask code and should be typically negligible. However,
  it can be inflated by GIL contention and by spill/unspill activity.

The grand total of the time shown should roughly add up to the end-to-end runtime of
your workload, multiplied by the number of threads on the cluster.

The left panel (**Task execution, by function**) shows the same information as the
central one, but broken down by function.

The right panel (**Send data, by activity**) shows network transfer time. Note that most
of it should be pipelined with task execution, so it may not have an impact. You should
worry about this only if you have a very large ``idle`` time.

There is a filter that allows you to show only selected functions. In the sample
screenshots, you can observe that most of the `thread-noncpu` time is concentrated - as
expected - in functions that are known to be I/O heavy. Here they are singled out:

.. image:: images/fine-performance-metrics/seconds-IO.png
   :alt: Fine Performance Metrics dashboard, just the I/O functions

And here's all other functions that take a non-trivial amount of time:

.. image:: images/fine-performance-metrics/seconds-not-IO.png
   :alt: Fine Performance Metrics dashboard, non-I/O functions

This tells us an important piece of information: why is `astype`, which is a pure CPU
function, spending so much time occupying workers' threads but without accruing any CPU
time? The answer, almost certainly, it's that it doesn't properly release the GIL.

Fine Performance Metrics collect more than wall clock timings. We can change the
unit to bytes:

.. image:: images/fine-performance-metrics/bytes.png
   :alt: Fine Performance Metrics dashboard, unit=bytes

The above gives us insights on spill/unspill activity (see :doc:`worker-memory`).
In this workflow, in 99% of the cases there was enough RAM to accommodate all the data
and it was unnecessary to retrieve it from disk; in other words we have *cache hits* for
99% of the data, which means that we would not get any benefit if we increased RAM, but
we would likely start seeing slowdowns if we reduced it.


Are task prefixes enough?
-------------------------
Individual task prefixes may be too granular; vice versa, the same task prefix may
appear in very different sections of the workflow. Your codebase may be complex enough
that it's not straightforward to pinpoint the client code that's responsible for a
specific task prefix.

:doc:`spans` let you break down these metrics into macro-blocks (e.g. data load,
preprocessing, etc.).


API for advanced users
----------------------
For the most part, fine performance metrics just work; as a user, you don't need to
change your client code.

If you run custom tasks on the cluster (e.g. via :meth:`~distributed.Client.submit`,
:meth:`~dask.array.Array.map_blocks`, or
:meth:`~dask.dataframe.DataFrame.map_partitions`), you may want to customize the metrics
they produce. For example, you may want to separate I/O time from `thread-noncpu`:

.. code-block:: python

   from distributed.metrics import context_meter

   @context_meter.meter("I/O")
   def read_some_files():
       ...

   future = client.submit(read_some_files)

In the above example, the wall time spent by the custom function ``read_some_files``
will be logged as "I/O", which is a completely arbitrary activity label.

Alternatively you may want to just label some of the time this way:

.. code-block:: python

   def read_some_files():
       with context_meter.meter("I/O"):
           data = read_from_network(...)
       return preprocess(data)

In the above example, the function is split into an I/O intensive phase,
``read_from_network``, and a CPU-intensive one, ``preprocess``. The
:meth:`distributed.metrics.context_meter.meter` context manager will log the time spent
by ``read_from_network`` as ``I/O``, whereas the time spent by ``preprocess`` still be
logged as a mix of ``thread-cpu`` and ``thread-noncpu`` (the latter may, for example,
highlight GIL contention).

.. note::
   The :meth:`distributed.metrics.context_meter.meter` context manager wraps around code
   that runs on the worker, in a single task. It won't work if used to decorate
   client-side code that defines the Dask graph. See :doc:`spans` for that.

Finally, you may want to report a metric that is not just wall time. For example, if
you're reading data from S3 Infrequent Access storage, you may want to keep track of it
to understand your spending:

.. code-block:: python

   def read_some_files():
       data = read_from_network(...)
       context_meter.digest_metric("S3 Infrequent Access", sizeof(data), "bytes")
       return data

Again, "S3 Infrequent Access" is a completely arbitrary activity label, and "bytes" is
a completely arbitrary unit of measure.

.. autofunction:: distributed.metrics.context_meter.digest_metric
.. autofunction:: distributed.metrics.context_meter.meter


Developer specifications
------------------------
.. admonition:: Intended audience

    This section is only of interest to developers maintaining Dask or writing scheduler
    extensions, e.g. to create an alternative dashboard or to store metrics long-term.

Fine Performance metrics are collected:

- On each worker
- On the scheduler, globally
- On the :doc:`spans`

On the workers, they are collected through :meth:`distributed.core.Server.digest_metric`
and stored in the ``Worker.digests_total`` mapping.

They are stored in this format:

- ``("execute", span_id, task_prefix, activity, unit): value``
- ``("gather-dep", activity, unit): value``
- ``("get-data", activity, unit): value``
- ``("memory-monitor", activity, unit): value``

At every heartbeat, they are synchronized to the scheduler, where they populate the
``Scheduler.cumulative_worker_metrics`` mapping, in this format:

- ``("execute", task_prefix, activity, unit): value``
- ``("gather-dep", activity, unit): value``
- ``("get-data", activity, unit): value``
- ``("memory-monitor", activity, unit): value``

As ``execute`` metrics have no span_id here, multiple records from the worker may have
been added up on a single one on the scheduler.

The ``execute`` metrics can also be found, disaggregated, on the :doc:`spans` at
``Scheduler.extensions["spans"].spans[span_id].cumulative_worker_metrics``, in this
format:

- ``("execute", task_prefix, activity, unit): value``

Notes
~~~~~
- In both ``Worker.digests_total`` and ``Scheduler.cumulative_worker_metrics``, you
  will also find keys unrelated to fine performance metrics, which are not necessarily
  tuples.
- Due to custom metrics (see previous section), ``activity`` is going to be *most
  times*, but not *always*, a string.
- Even barring custom metrics, more fine-grained activities may be added in the future,
  so it's never a good idea to implement hardcoded tests for them.