1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
.. _apply-family:
Apply family
============
.. contents:: Contents
:depth: 2
:local:
:obj:`mpire.WorkerPool` implements two ``apply`` functions, which are very similar to the ones in the
:mod:`multiprocessing` module:
:meth:`mpire.WorkerPool.apply`
Apply a function to a single task. This is a blocking call.
:meth:`mpire.WorkerPool.apply_async`
A variant of the above, but which is non-blocking. This returns an :obj:`mpire.async_result.AsyncResult` object.
``apply``
---------
The ``apply`` function is a blocking call, which means that it will not return until the task is completed. If you want
to run multiple different tasks in parallel, you should use the ``apply_async`` function instead. If you require
to run the same function for many tasks in parallel, use the ``map`` functions instead.
The ``apply`` function takes a function, positional arguments, and keyword arguments, similar to how
:mod:`multiprocessing` does it.
.. code-block:: python
def task(a, b, c, d):
return a + b + c + d
with WorkerPool(n_jobs=1) as pool:
result = pool.apply(task, args=(1, 2), kwargs={'d': 4, 'c': 3})
print(result)
``apply_async``
---------------
The ``apply_async`` function is a non-blocking call, which means that it will return immediately. It returns an
:obj:`mpire.async_result.AsyncResult` object, which can be used to get the result of the task at a later moment in time.
The ``apply_async`` function takes the same parameters as the ``apply`` function.
.. code-block:: python
def task(a, b):
return a + b
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
results = [async_result.get() for async_result in async_results]
Obtaining the results should happen while the pool is still running! E.g., the following will deadlock:
.. code-block::
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
# Will wait forever
results = [async_result.get() for async_result in async_results]
You can, however, make use of the :meth:`mpire.WorkerPool.stop_and_join()` function to stop the workers and join the
pool. This will make sure that all tasks are completed before the pool exits.
.. code-block::
with WorkerPool(n_jobs=4) as pool:
async_results = [pool.apply_async(task, args=(i, i)) for i in range(10)]
pool.stop_and_join()
# Will not deadlock
results = [async_result.get() for async_result in async_results]
AsyncResult
-----------
The :obj:`mpire.async_result.AsyncResult` object has the following convenient methods:
.. code-block:: python
with WorkerPool(n_jobs=1) as pool:
async_result = pool.apply_async(task, args=(1, 1))
# Check if the task is completed
is_completed = async_result.ready()
# Wait until the task is completed, or until the timeout is reached.
async_result.wait(timeout=10)
# Get the result of the task. This will block until the task is completed,
# or until the timeout is reached.
result = async_result.get(timeout=None)
# Check if the task was successful (i.e., did not raise an exception).
# This will raise an exception if the task is not completed yet.
is_successful = async_result.successful()
Callbacks
---------
Each ``apply`` function has a ``callback`` and ``error_callback`` argument. These are functions which are called when
the task is finished. The ``callback`` function is called with the result of the task when the task was completed
successfully, and the ``error_callback`` is called with the exception when the task failed.
.. code-block:: python
def task(a):
return a + 1
def callback(result):
print("Task completed successfully with result:", result)
def error_callback(exception):
print("Task failed with exception:", exception)
with WorkerPool(n_jobs=1) as pool:
pool.apply(task, 42, callback=callback, error_callback=error_callback)
Worker init and exit
--------------------
As with the ``map`` family of functions, the ``apply`` family of functions also has ``worker_init`` and ``worker_exit``
arguments. These are functions which are called when a worker is started and stopped, respectively. See
:ref:`worker_init_exit` for more information on these functions.
.. code-block:: python
def worker_init():
print("Worker started")
def worker_exit():
print("Worker stopped")
with WorkerPool(n_jobs=5) as pool:
pool.apply(task, 42, worker_init=worker_init, worker_exit=worker_exit)
There's a caveat though. When the first ``apply`` or ``apply_async`` function is executed, the entire pool of workers
is started. This means that in the above example all five workers are started, while only one was needed. This also
means that the ``worker_init`` function is set for all those workers at once. This means you cannot have a different
``worker_init`` function for each apply task. A second, different ``worker_init`` function will simply be ignored.
Similarly, the ``worker_exit`` function can only be set once as well. Additionally, exit functions are only called when
a worker exits, which in this case translates to when the pool exits. This means that if you call ``apply`` or
``apply_async`` multiple times, the ``worker_exit`` function is only called once at the end. Use
:meth:`mpire.WorkerPool.stop_and_join()` to stop the workers, which will cause the ``worker_exit`` function to be
triggered for each worker.
Timeouts
--------
The ``apply`` family of functions also has ``task_timeout``, ``worker_init_timeout`` and ``worker_exit_timeout``
arguments. These are timeouts for the task, the ``worker_init`` function and the ``worker_exit`` function, respectively.
They work similarly as those for the ``map`` functions.
When a single task times out, only that task is cancelled. The other tasks will continue to run. When a worker init or
exit times out, the entire pool is stopped.
See :ref:`timeouts` for more information.
|