1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
|
.. -*- coding: utf-8 -*-
.. _parallel-analysis:
=================
Parallel analysis
=================
.. versionadded:: 2.8.0
Parallelization of analysis classes was added during Google Summer of Code
2023 by `@marinegor <https://github.com/marinegor>`_ and MDAnalysis GSoC mentors.
This section explains the implementation and background for
parallelization with the :class:`MDAnalysis.analysis.base.AnalysisBase`, what
users and developers need to know, when you should use parallelization (almost
always!), and when you should abstain from doing so (rarely).
How to use parallelization
==========================
In order to use parallelization in a built-in analysis class ``SomeAnalysisClass``,
simply check which backends are available (see :ref:`backends` for backends
that are generally available), and then just enable them by providing
``backend='multiprocessing'`` and ``n_workers=...`` to ``SomeClass.run(...)``
method:
.. code-block:: python
u = mda.Universe(...)
my_run = SomeClass(trajectory)
assert SomeClass.get_supported_backends() == ('serial', 'multiprocessing', 'dask')
my_run.run(backend='multiprocessing', n_workers=12)
For some classes, such as :class:`MDAnalysis.analysis.rms.RMSF` (in its current implementation),
split-apply-combine parallelization isn't possible, and running them will be
impossible with any but the ``serial`` backend.
.. Note::
Parallelization is getting added to existing analysis classes. Initially,
only :class:`MDAnalysis.analysis.rms.RMSD` supports parallel analysis, but
we aim to increase support in future releases. Please check issues labeled
`parallelization` on the `MDAnalysis issues tracker <https://github.com/MDAnalysis/mdanalysis/issues?q=is%3Aopen+is%3Aissue+label%3Aparallelization>`_.
How does parallelization work
=============================
The main idea behind its current implementation is that a trajectory analysis is
often trivially parallelizable, meaning you can analyze all frames
independently, and then merge them in a single object. This approach is also
known as "split-apply-combine", and isn't new to MDAnalysis users, since it was
first introduced in `PMDA`_ :footcite:p:`Fan2019`.
Version 2.8.0 of MDAnalysis brings this approach to the main library.
.. _`PMDA`: https://github.com/mdanalysis/pmda
split-apply-combine
-------------------
The following scheme explains the current :meth:`AnalysisBase.run
<MDAnalysis.analysis.base.AnalysisBase.run>` protocol (user-implemented methods
are highlighted in orange):
.. figure:: /images/AnalysisBase_parallel.png
In short, after checking input parameters and configuring the backend,
:class:`AnalysisBase <<MDAnalysis.analysis.base.AnalysisBase>` splits all the
frames into *computation groups* (equally sized sequential groups of frames to
be processed by each worker). All groups then get **split** between workers of
a backend configured early, the main instance gets serialized and distributed
between workers, and then
:meth:`~MDAnalysis.analysis.base.AnalysisBase._compute()` method gets called
for all frames of a computation group. Within this method, a user-implemented
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` method gets
**applied** to each frame in a computation group. After that, the main
instance gets an object that will **combine** all the objects from other
workers, and all instances get *merged* with an instance of
:class:`MDAnalysis.analysis.results.ResultsGroup`. Then, a normal
user-implemented :meth:`~MDAnalysis.analysis.base.AnalysisBase._compute` method
is called.
Parallelization is fully compatible with existing code and does *not* break
any existing code pre-2.8.0. The parallelization protocol mimics the
single-process workflow where possible. Thus, user-implemented methods such as
:meth:`~MDAnalysis.analysis.base.AnalysisBase._prepare`,
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` and
:meth:`~MDAnalysis.analysis.base.AnalysisBase._conclude` won't need to know
they are operating on an instance within the main python process, or on a
remote instance, since the executed code is the same in both cases.
Methods in ``AnalysisBase`` for parallelization
-----------------------------------------------
For developers of new analysis tools
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you want to write your own *parallel* analysis class, you have to implement
:meth:`~MDAnalysis.analysis.base.AnalysisBase._prepare`,
:meth:`~MDAnalysis.analysis.base.AnalysisBase._single_frame` and
:meth:`~MDAnalysis.analysis.base.AnalysisBase._conclude`. You also have to
denote if your analysis can run in parallel by following the steps under
:ref:`adding-parallelization`.
.. Note::
Attributes that are the same for the whole trajectory, should be defined
in `__init__` method because they need to be consistent across all workers.
For MDAnalysis developers
~~~~~~~~~~~~~~~~~~~~~~~~~
From a developer point of view, there are a few methods that are important in
order to understand how parallelization is implemented:
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._define_run_frames`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._prepare_sliced_trajectory`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._configure_backend`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._setup_computation_groups`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._compute`
#. :meth:`MDAnalysis.analysis.base.AnalysisBase._get_aggregator`
The first two methods share the functionality of :meth:`_setup_frames`.
:meth:`_define_run_frames` is run once during analysis, as it checks that input
parameters `start`, `stop`, `step` or `frames` are consistent with the given
trajectory and prepares the ``slicer`` object that defines the iteration
pattern through the trajectory. :meth:`_prepare_sliced_trajectory` assigns to
the :attr:`self._sliced_trajectory` attribute, computes the number of frames in
it, and fills the :attr:`self.frames` and :attr:`self.times` arrays. In case
the computation will be later split between other processes, this method will
be called again on each of the computation groups.
The method :meth:`_configure_backend` performs basic health checks for a given
analysis class -- namely, it compares a given backend (if it's a :class:`str`
instance, such as ``'multiprocessing'``) with the list of builtin backends (and
also the backends implemented for a given analysis subclass), and configures a
:class:`MDAnalysis.analysis.backends.BackendBase` instance accordingly. If the
user decides to provide a custom backend (any subclass of
:class:`MDAnalysis.analysis.backends.BackendBase`, or anything with an
:meth:`apply` method), it ensures that the number of workers wasn't specified
twice (during backend initialization and in :meth:`run` arguments).
After a backend is configured, :meth:`_setup_computation_groups` splits the
frames prepared earlier in :attr:`self._prepare_sliced_trajectory` into a
number of groups, by default equal to the number of workers.
In the :meth:`_compute` method, frames get initialized again with
:meth:`_prepare_sliced_trajectory`, and attributes necessary for a specific
analysis get initialized with the :meth:`_prepare` method. Then the function
iterates over :attr:`self._sliced_trajectory`, assigning
:attr:`self._frame_index` and :attr:`self._ts` as frame index (within a
computation group) and timestamp, and also setting respective
:attr:`self.frames` and :attr:`self.times` array values.
After :meth:`_compute` has finished, the main analysis instance calls the
:meth:`_get_aggregator` method, which merges the :attr:`self.results`
attributes from other processes into a single
:class:`MDAnalysis.analysis.results.Results` instance, making it look for the
subsequent :meth:`_conclude` method as if the run was performed in a serial
fashion, without parallelization.
Helper classes for parallelization
==================================
``ResultsGroup``
----------------
:class:`MDAnalysis.analysis.results.ResultsGroup` extends the functionality of
the :class:`MDAnalysis.analysis.results.Results` class. Since the ``Results``
class is basically a dictionary that also keeps track of assigned attributes, it
is possible to iterate over all these attributes later. ``ResultsGroup`` does
exactly that: given a list of the ``Results`` objects with the same attributes,
it applies a specific aggregation function to every attribute, and stores it as
a same attribute of the returned object:
.. code-block:: python
from MDAnalysis.analysis.results import ResultsGroup, Results
group = ResultsGroup(lookup={'mass': ResultsGroup.float_mean})
obj1 = Results(mass=1)
obj2 = Results(mass=3)
assert group.merge([obj1, obj2]) == Results(mass=2.0)
``BackendBase``
---------------
:class:`MDAnalysis.analysis.backends.BackendBase` holds all backend attributes,
and also implements an :meth:`MDAnalysis.analysis.backends.BackendBase.apply`
method, applying a given function to a list of its parameters, but in a parallel
fashion. Although in ``AnalysisBase`` it is used to apply a ``_compute``
function, in principle it can be used to any arbitrary function and arguments,
given they're serializable.
When to use parallelization? (Known limitations)
================================================
For now, the syntax for running parallel analysis is explicit, meaning by
default the ``serial`` version will be run, and the parallelization won't be
enabled by default. Although we expect the parallelization to be useful in most
cases, there are some known caveats from the inital benchmarks.
Fast ``_single_frame`` compared to reading from disk
----------------------------------------------------
In all cases, parallelization will not be useful only when frames are being
processed faster than being read from the disk, otherwise reading is the
bottleneck here. Hence, you'll benefit from parallelization only if you have
relatively much compute per frame, or a fast drive, as illustrated below:
.. figure:: /images/parallelization_time.png
In other words, if you have *fast* analysis (say,
:class:`MDAnalysis.analysis.rms.RMSD`) **and** a slow HDD drive, you are likely
to not get any benefits from parallelization. Otherwise, you should be fine.
Serialization issues
--------------------
For built-in analysis classes, the default serialization with both
:mod:`multiprocessing` and :mod:`dask` is known to work. If you're using some custom
analysis class that e.g. stores a non-serializable object in one of its
attributes, you might get a serialization error (:exc:`PicklingError` if you're
using a ``multiprocessing`` backend). If you want to get around that, we suggest
trying ``backend='dask'`` (it uses ``dask`` serialization engine instead of
:mod:`pickle`).
Out of memory issues
--------------------
If you have large memory footprint of each worker, you can run into
out-of-memory errors (i.e. your server freezes when executing a run). In this
case we suggest decreasing the number of workers from all available CPUs (that
you can get with :func:`multiprocessing.cpu_count`) to a smaller number.
Progress bar is missing
-----------------------
It is yet not possible to get a progress bar running with any parallel backend.
If you want an ETA of your analysis, we suggest running it in ``serial`` mode
for the first 10-100 frames with ``verbose=True``, and then running it with
multiple workers. Processing time scales almost linearly, so you can get your
ETA by dividing ``serial`` ETA by the number of workers.
.. _adding-parallelization:
Adding parallelization to your own analysis class
=================================================
If you want to add parallelization to your own analysis class, first make sure
your algorithm allows you to do that, i.e. you can process each frame independently.
Then it's rather simple -- let's look at the actual code that added
parallelization to the :class:`MDAnalysis.analysis.rms.RMSD`:
.. code-block:: python
from MDAnalysis.analysis.base import AnalysisBase
from MDAnalysis.analysis.results import ResultsGroup
class RMSD(BackendBase):
@classmethod
def get_supported_backends(cls):
return ('serial', 'multiprocessing', 'dask',)
_analysis_algorithm_is_parallelizable = True
def _get_aggregator(self):
return ResultsGroup(lookup={'rmsd': ResultsGroup.ndarray_vstack})
That's it! The first two methods are boilerplate --
:meth:`get_supported_backends` returns a tuple with built-in backends that will
work for your class (if there are no serialization issues, it should be all
three), and ``_is_parallelizable`` is ``True`` (which is set to ``False`` in
``AnalysisBase``, hence we have to re-define it), and :meth:`_get_aggregator`
will be used as described earlier. Note that :mod:`MDAnalysis.analysis.results`
also provides a few convenient functions (defined as class methods of
:class:`~MDAnalysis.analysis.results.ResultsGroup`) for results aggregation:
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.flatten_sequence`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_sum`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_mean`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.float_mean`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_hstack`
#. :meth:`~MDAnalysis.analysis.results.ResultsGroup.ndarray_vstack`
So you'll likely find appropriate functions for basic aggregation there.
Writing custom backends
=======================
In order to write your custom backend (e.g. using :mod:`dask.distributed`), inherit
from the :class:`MDAnalysis.analysis.backends.BackendBase` and (re)-implement
:meth:`__init__` and :meth:`apply` methods. Optionally, you can implement methods for
validation of correct backend initialization -- :meth:`_get_checks` and
:meth:`_get_warnings`, that would raise an exception or give a warning, respectively,
when a new class instance is created:
#. :meth:`MDAnalysis.analysis.backends._get_checks`
#. :meth:`MDAnalysis.analysis.backends._get_warnings`
.. code-block:: python
from MDAnalysis.analysis.backends import BackendBase
class ThreadsBackend(BackendBase):
def __init__(self, n_workers: int, starting_message: str = "Useless backend"):
self.n_workers = n_workers
self.starting_message = starting_message
self._validate()
def _get_warnings(self):
return {True: 'warning: this backend is useless'}
def _get_checks(self):
return {isinstance(self.n_workers, int), 'error: self.n_workers is not an integer'}
def apply(self, func, computations):
from multiprocessing.dummy import Pool
with Pool(processes=self.n_workers) as pool:
print(self.starting_message)
results = pool.map(func, computations)
return results
In order to use a custom backend with another analysis class that does not
explicitly support it, you must *explicitly state* that you're about to use an
unsupported_backend by passing the keyword argument
``unsupported_backend=True``:
.. code-block:: python
from MDAnalysis.analysis.rms import RMSD
R = RMSD(...) # setup the run
n_workers = 2
backend = ThreadsBackend(n_workers=n_workers)
R.run(backend=backend, unsupported_backend=True)
In this way, you will override the check for supported backends.
.. Warning::
When you use ``unsupported_backend=True`` you should make sure that you get
the same results as when using a supported backend for which the analysis
class was tested.
Before reporting a problem with an analysis class, make sure you tested it
with a supported backend. When reporting *always mention if you used*
``unsupported_backend=True``.
.. rubric:: References
.. footbibliography::
|