File: getting_started.rst

package info (click to toggle)
mpire 2.10.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,064 kB
  • sloc: python: 5,473; makefile: 209; javascript: 182
file content (135 lines) | stat: -rw-r--r-- 5,170 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
Getting started
===============

Suppose you have a time consuming function that receives some input and returns its results. This could look like the
following:

.. code-block:: python

    import time

    def time_consuming_function(x):
        time.sleep(1)  # Simulate that this function takes long to complete
        return ...

    results = [time_consuming_function(x) for x in range(10)]

Running this function takes about 10 seconds to complete.

Functions like these are known as `embarrassingly parallel`_ problems, functions that require little to no effort to
turn into a parallel task. Parallelizing a simple function as this can be as easy as importing ``multiprocessing`` and
using the ``multiprocessing.Pool`` class:

.. _embarrassingly parallel: https://en.wikipedia.org/wiki/Embarrassingly_parallel

.. code-block:: python

    from multiprocessing import Pool

    with Pool(processes=5) as pool:
        results = pool.map(time_consuming_function, range(10))

We configured to have 5 workers, so we can handle 5 tasks in parallel. As a result, this function will complete in about
2 seconds.

MPIRE can be used almost as a drop-in replacement to ``multiprocessing``. We use the :obj:`mpire.WorkerPool` class and
call one of the available ``map`` functions:

.. code-block:: python

    from mpire import WorkerPool

    with WorkerPool(n_jobs=5) as pool:
        results = pool.map(time_consuming_function, range(10))

Similarly, this will complete in about 2 seconds. The differences in code are small: there's no need to learn a
completely new multiprocessing syntax, if you're used to vanilla ``multiprocessing``. The additional available
functionality, though, is what sets MPIRE apart.

Progress bar
------------

Suppose we want to know the status of the current task: how many tasks are completed, how long before the work is ready?
It's as simple as setting the ``progress_bar`` parameter to ``True``:

.. code-block:: python

    with WorkerPool(n_jobs=5) as pool:
        results = pool.map(time_consuming_function, range(10), progress_bar=True)

And it will output a nicely formatted tqdm_ progress bar.

MPIRE also offers a dashboard, for which you need to install additional :ref:`dependencies <dashboarddep>`. See
:ref:`Dashboard` for more information.

.. _tqdm: https://tqdm.github.io/


Shared objects
--------------

If you have one or more objects that you want to share between all workers you can make use of the copy-on-write
``shared_objects`` option of MPIRE. MPIRE will pass on these objects only once for each worker without
copying/serialization. Only when the object is altered in the worker function it will start copying it for that worker.

.. note::

    Copy-on-write is not available on Windows, as it requires the start method ``fork``.

.. code-block:: python

    def time_consuming_function(some_object, x):
        time.sleep(1)  # Simulate that this function takes long to complete
        return ...

    def main():
        some_object = ...
        with WorkerPool(n_jobs=5, shared_objects=some_object, start_method='fork') as pool:
            results = pool.map(time_consuming_function, range(10), progress_bar=True)

See :ref:`shared_objects` for more details.

Worker initialization
---------------------

Need to initialize each worker before starting the work? Have a look at the ``worker_state`` and ``worker_init``
functionality:

.. code-block:: python

    def init(worker_state):
        # Load a big dataset or model and store it in a worker specific worker_state
        worker_state['dataset'] = ...
        worker_state['model'] = ...

    def task(worker_state, idx):
        # Let the model predict a specific instance of the dataset
        return worker_state['model'].predict(worker_state['dataset'][idx])

    with WorkerPool(n_jobs=5, use_worker_state=True) as pool:
        results = pool.map(task, range(10), worker_init=init)

Similarly, you can use the ``worker_exit`` parameter to let MPIRE call a function whenever a worker terminates. You can
even let this exit function return results, which can be obtained later on. See the :ref:`worker_init_exit` section for
more information.


Worker insights
---------------

When your multiprocessing setup isn't performing as you want it to and you have no clue what's causing it, there's the
worker insights functionality. This will give you some insight in your setup, but it will not profile the function
you're running (there are other libraries for that). Instead, it profiles the worker start up time, waiting time and
working time. When worker init and exit functions are provided it will time those as well.

Perhaps you're sending a lot of data over the task queue, which makes the waiting time go up. Whatever the case, you
can enable and grab the insights using the ``enable_insights`` flag and :meth:`mpire.WorkerPool.get_insights` function,
respectively:

.. code-block:: python

    with WorkerPool(n_jobs=5, enable_insights=True) as pool:
        results = pool.map(time_consuming_function, range(10))
        insights = pool.get_insights()

See :ref:`worker insights` for a more detailed example and expected output.