File: parallelization.rst

package info (click to toggle)
mdanalysis 2.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,696 kB
  • sloc: python: 92,135; ansic: 8,156; makefile: 215; sh: 138
file content (363 lines) | stat: -rw-r--r-- 15,948 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
.. -*- coding: utf-8 -*-

.. _parallel-analysis:
   
=================
Parallel analysis
=================

.. versionadded:: 2.8.0
   Parallelization of analysis classes was added during Google Summer of Code
   2023 by `@marinegor <https://github.com/marinegor>`_ and MDAnalysis GSoC mentors.

This section explains the implementation and background for
parallelization with the :class:`MDAnalysis.analysis.base.AnalysisBase`, what 
users and developers need to know, when you should use parallelization (almost 
always!), and when you should abstain from doing so (rarely).


How to use parallelization
==========================

In order to use parallelization in a built-in analysis class ``SomeAnalysisClass``,
simply check which backends are available (see :ref:`backends` for backends
that are generally available), and then just enable them by providing
``backend='multiprocessing'`` and ``n_workers=...`` to ``SomeClass.run(...)``
method:

.. code-block:: python

    u = mda.Universe(...)
    my_run = SomeClass(trajectory)
    assert SomeClass.get_supported_backends() == ('serial', 'multiprocessing', 'dask')

    my_run.run(backend='multiprocessing', n_workers=12)

For some classes, such as :class:`MDAnalysis.analysis.rms.RMSF` (in its current implementation),
split-apply-combine parallelization isn't possible, and running them will be
impossible with any but the ``serial`` backend.

.. Note::

   Parallelization is getting added to existing analysis classes.  Initially,
   only :class:`MDAnalysis.analysis.rms.RMSD` supports parallel analysis, but
   we aim to increase support in future releases. Please check issues labeled
   `parallelization` on the `MDAnalysis issues tracker <https://github.com/MDAnalysis/mdanalysis/issues?q=is%3Aopen+is%3Aissue+label%3Aparallelization>`_.


How does parallelization work
=============================

The main idea behind its current implementation is that a trajectory analysis is
often trivially parallelizable, meaning you can analyze all frames
independently, and then merge them in a single object. This approach is also
known as "split-apply-combine", and isn't new to MDAnalysis users, since it was
first introduced in `PMDA`_ :footcite:p:`Fan2019`.  
Version 2.8.0 of MDAnalysis brings this approach to the main library.

.. _`PMDA`: https://github.com/mdanalysis/pmda


split-apply-combine
-------------------

The following scheme explains the current :meth:`AnalysisBase.run
<MDAnalysis.analysis.base.AnalysisBase.run>` protocol (user-implemented methods
are highlighted in orange):

.. figure:: /images/AnalysisBase_parallel.png


In short, after checking input parameters and configuring the backend,
:class:`AnalysisBase <<MDAnalysis.analysis.base.AnalysisBase>` splits all the
frames into *computation groups* (equally sized sequential groups of frames to
be processed by each worker). All groups then get **split** between workers of
a backend configured early, the main instance gets serialized and distributed
between workers, and then
:meth:`~MDAnalysis.analysis.base.AnalysisBase._compute()` method gets called
for all frames of a computation group. Within this method, a user-implemented
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` method gets
**applied** to each frame in a computation group.  After that, the main
instance gets an object that will **combine** all the objects from other
workers, and all instances get *merged* with an instance of
:class:`MDAnalysis.analysis.results.ResultsGroup`. Then, a normal
user-implemented :meth:`~MDAnalysis.analysis.base.AnalysisBase._compute` method
is called.

Parallelization is fully compatible with existing code and does *not* break
any existing code pre-2.8.0. The parallelization protocol mimics the
single-process workflow where possible. Thus, user-implemented methods such as
:meth:`~MDAnalysis.analysis.base.AnalysisBase._prepare`,
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` and
:meth:`~MDAnalysis.analysis.base.AnalysisBase._conclude` won't need to know
they are operating on an instance within the main python process, or on a
remote instance, since the executed code is the same in both cases.


Methods in ``AnalysisBase`` for parallelization
-----------------------------------------------

For developers of new analysis tools
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you want to write your own *parallel* analysis class, you have to implement
:meth:`~MDAnalysis.analysis.base.AnalysisBase._prepare`,
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` and
:meth:`~MDAnalysis.analysis.base.AnalysisBase._conclude`. You also have to
denote if your analysis can run in parallel by following the steps under
:ref:`adding-parallelization`.

.. Note::

   Attributes that are the same for the whole trajectory, should be defined
   in `__init__` method because they need to be consistent across all workers.

For MDAnalysis developers
~~~~~~~~~~~~~~~~~~~~~~~~~

From a developer point of view, there are a few methods that are important in
order to understand how parallelization is implemented:

#. :meth:`MDAnalysis.analysis.base.AnalysisBase._define_run_frames`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._prepare_sliced_trajectory`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._configure_backend`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._setup_computation_groups`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._compute`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._get_aggregator`

The first two methods share the functionality of :meth:`_setup_frames`.
:meth:`_define_run_frames` is run once during analysis, as it checks that input
parameters `start`, `stop`, `step` or `frames` are consistent with the given
trajectory and prepares the ``slicer`` object that defines the iteration
pattern through the trajectory. :meth:`_prepare_sliced_trajectory` assigns to
the :attr:`self._sliced_trajectory` attribute, computes the number of frames in
it, and fills the :attr:`self.frames` and :attr:`self.times` arrays. In case
the computation will be later split between other processes, this method will
be called again on each of the computation groups.

The method :meth:`_configure_backend` performs basic health checks for a given
analysis class -- namely, it compares a given backend (if it's a :class:`str`
instance, such as ``'multiprocessing'``) with the list of builtin backends (and
also the backends implemented for a given analysis subclass), and configures a
:class:`MDAnalysis.analysis.backends.BackendBase` instance accordingly. If the
user decides to provide a custom backend (any subclass of
:class:`MDAnalysis.analysis.backends.BackendBase`, or anything with an
:meth:`apply` method), it ensures that the number of workers wasn't specified
twice (during backend initialization and in :meth:`run` arguments).

After a backend is configured, :meth:`_setup_computation_groups` splits the
frames prepared earlier in :attr:`self._prepare_sliced_trajectory` into a
number of groups, by default equal to the number of workers.

In the :meth:`_compute` method, frames get initialized again with
:meth:`_prepare_sliced_trajectory`, and attributes necessary for a specific
analysis get initialized with the :meth:`_prepare` method. Then the function
iterates over :attr:`self._sliced_trajectory`, assigning
:attr:`self._frame_index` and :attr:`self._ts` as frame index (within a
computation group) and timestamp, and also setting respective
:attr:`self.frames` and :attr:`self.times` array values.

After :meth:`_compute` has finished, the main analysis instance calls the
:meth:`_get_aggregator` method, which merges the :attr:`self.results`
attributes from other processes into a single
:class:`MDAnalysis.analysis.results.Results` instance, making it look for the
subsequent :meth:`_conclude` method as if the run was performed in a serial
fashion, without parallelization.


Helper classes for parallelization
==================================

``ResultsGroup``
----------------

:class:`MDAnalysis.analysis.results.ResultsGroup` extends the functionality of
the :class:`MDAnalysis.analysis.results.Results` class. Since the ``Results``
class is basically a dictionary that also keeps track of assigned attributes, it
is possible to iterate over all these attributes later. ``ResultsGroup`` does
exactly that: given a list of the ``Results`` objects with the same attributes,
it applies a specific aggregation function to every attribute, and stores it as
a same attribute of the returned object:

.. code-block:: python

    from MDAnalysis.analysis.results import ResultsGroup, Results
    group = ResultsGroup(lookup={'mass': ResultsGroup.float_mean})
    obj1 = Results(mass=1)
    obj2 = Results(mass=3)
    assert group.merge([obj1, obj2]) == Results(mass=2.0)


``BackendBase``
---------------

:class:`MDAnalysis.analysis.backends.BackendBase` holds all backend attributes,
and also implements an :meth:`MDAnalysis.analysis.backends.BackendBase.apply`
method, applying a given function to a list of its parameters, but in a parallel
fashion. Although in ``AnalysisBase`` it is used to apply a ``_compute``
function, in principle it can be used to any arbitrary function and arguments,
given they're serializable.


When to use parallelization? (Known limitations)
================================================

For now, the syntax for running parallel analysis is explicit, meaning by
default the ``serial`` version will be run, and the parallelization won't be
enabled by default. Although we expect the parallelization to be useful in most
cases, there are some known caveats from the inital benchmarks.

Fast ``_single_frame`` compared to reading from disk
----------------------------------------------------

In all cases, parallelization will not be useful only when frames are being
processed faster than being read from the disk, otherwise reading is the
bottleneck here. Hence, you'll benefit from parallelization only if you have
relatively much compute per frame, or a fast drive, as illustrated below:

.. figure:: /images/parallelization_time.png

In other words, if you have *fast* analysis (say,
:class:`MDAnalysis.analysis.rms.RMSD`) **and** a slow HDD drive, you are likely
to not get any benefits from parallelization. Otherwise, you should be fine.

Serialization issues
--------------------

For built-in analysis classes, the default serialization with both
:mod:`multiprocessing` and :mod:`dask` is known to work. If you're using some custom
analysis class that e.g. stores a non-serializable object in one of its
attributes, you might get a serialization error (:exc:`PicklingError` if you're
using a ``multiprocessing`` backend). If you want to get around that, we suggest
trying ``backend='dask'`` (it uses ``dask`` serialization engine instead of
:mod:`pickle`).

Out of memory issues
--------------------

If you have large memory footprint of each worker, you can run into
out-of-memory errors (i.e. your server freezes when executing a run). In this
case we suggest decreasing the number of workers from all available CPUs (that
you can get with :func:`multiprocessing.cpu_count`) to a smaller number.

Progress bar is missing
-----------------------

It is yet not possible to get a progress bar running with any parallel backend.
If you want an ETA of your analysis, we suggest running it in ``serial`` mode
for the first 10-100 frames with ``verbose=True``, and then running it with
multiple workers. Processing time scales almost linearly, so you can get your
ETA by dividing ``serial`` ETA by the number of workers.


.. _adding-parallelization:

Adding parallelization to your own analysis class
=================================================

If you want to add parallelization to your own analysis class, first make sure
your algorithm allows you to do that, i.e. you can process each frame independently.
Then it's rather simple -- let's look at the actual code that added
parallelization to the :class:`MDAnalysis.analysis.rms.RMSD`:

.. code-block:: python

    from MDAnalysis.analysis.base import AnalysisBase
    from MDAnalysis.analysis.results import ResultsGroup

    class RMSD(BackendBase):
        @classmethod
        def get_supported_backends(cls):
            return ('serial', 'multiprocessing', 'dask',)

        _analysis_algorithm_is_parallelizable = True
        
        def _get_aggregator(self):
            return ResultsGroup(lookup={'rmsd': ResultsGroup.ndarray_vstack})


That's it! The first two methods are boilerplate --
:meth:`get_supported_backends` returns a tuple with built-in backends that will
work for your class (if there are no serialization issues, it should be all
three), and ``_is_parallelizable`` is ``True`` (which is set to ``False`` in
``AnalysisBase``, hence we have to re-define it), and :meth:`_get_aggregator`
will be used as described earlier. Note that :mod:`MDAnalysis.analysis.results`
also provides a few convenient functions (defined as class methods of
:class:`~MDAnalysis.analysis.results.ResultsGroup`) for results aggregation:

#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.flatten_sequence`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_sum`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_mean`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.float_mean`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_hstack`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_vstack`


So you'll likely find appropriate functions for basic aggregation there.

Writing custom backends
=======================

In order to write your custom backend (e.g. using :mod:`dask.distributed`), inherit
from the :class:`MDAnalysis.analysis.backends.BackendBase` and (re)-implement
:meth:`__init__` and :meth:`apply` methods. Optionally, you can implement methods for
validation of correct backend initialization -- :meth:`_get_checks` and
:meth:`_get_warnings`, that would raise an exception or give a warning, respectively,
when a new class instance is created:

#. :meth:`MDAnalysis.analysis.backends._get_checks`
#. :meth:`MDAnalysis.analysis.backends._get_warnings`

.. code-block:: python

    from MDAnalysis.analysis.backends import BackendBase
    class ThreadsBackend(BackendBase):
        def __init__(self, n_workers: int, starting_message: str = "Useless backend"):
            self.n_workers = n_workers
            self.starting_message = starting_message
            self._validate()

        def _get_warnings(self):
            return {True: 'warning: this backend is useless'}
        
        def _get_checks(self):
            return {isinstance(self.n_workers, int), 'error: self.n_workers is not an integer'}

        def apply(self, func, computations):
            from multiprocessing.dummy import Pool

            with Pool(processes=self.n_workers) as pool:
                print(self.starting_message)
                results = pool.map(func, computations)
            return results
    

In order to use a custom backend with another analysis class that does not
explicitly support it, you must *explicitly state* that you're about to use an
unsupported_backend by passing the keyword argument
``unsupported_backend=True``:

.. code-block:: python

    from MDAnalysis.analysis.rms import RMSD
    R = RMSD(...) # setup the run
    n_workers = 2
    backend = ThreadsBackend(n_workers=n_workers)
    R.run(backend=backend, unsupported_backend=True)

In this way, you will override the check for supported backends.

.. Warning::

   When you use ``unsupported_backend=True`` you should make sure that you get
   the same results as when using a supported backend for which the analysis
   class was tested.

   Before reporting a problem with an analysis class, make sure you tested it
   with a supported backend. When reporting *always mention if you used*
   ``unsupported_backend=True``.
    

.. rubric:: References
.. footbibliography::