1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
..
For doctests:
>>> import sys
>>> setup = getfixture('parallel_numpy_fixture')
>>> fixture = setup(sys.modules[__name__])
By default the workers of the pool are real Python processes forked using the
``multiprocessing`` module of the Python standard library when ``n_jobs != 1``.
The arguments passed as input to the ``Parallel`` call are serialized and
reallocated in the memory of each worker process.
This can be problematic for large arguments as they will be reallocated
``n_jobs`` times by the workers.
As this problem can often occur in scientific computing with ``numpy``
based datastructures, :class:`joblib.Parallel` provides a special
handling for large arrays to automatically dump them on the filesystem
and pass a reference to the worker to open them as memory map
on that file using the ``numpy.memmap`` subclass of ``numpy.ndarray``.
This makes it possible to share a segment of data between all the
worker processes.
.. note::
The following only applies with the ``"loky"`` and
``'multiprocessing'`` process-backends. If your code can release the
GIL, then using a thread-based backend by passing
``prefer='threads'`` is even more efficient because it makes it
possible to avoid the communication overhead of process-based
parallelism.
Scientific Python libraries such as numpy, scipy, pandas and
scikit-learn often release the GIL in performance critical code paths.
It is therefore advised to always measure the speed of thread-based
parallelism and use it when the scalability is not limited by the GIL.
Automated array to memmap conversion
------------------------------------
The automated array to memmap conversion is triggered by a configurable
threshold on the size of the array::
>>> import numpy as np
>>> from joblib import Parallel, delayed
>>> def is_memmap(obj):
... return isinstance(obj, np.memmap)
>>> Parallel(n_jobs=2, max_nbytes=1e6)(
... delayed(is_memmap)(np.ones(int(i)))
... for i in [1e2, 1e4, 1e6])
[False, False, True]
By default the data is dumped to the ``/dev/shm`` shared-memory partition if it
exists and is writable (typically the case under Linux). Otherwise the
operating system's temporary folder is used. The location of the temporary data
files can be customized by passing a ``temp_folder`` argument to the
``Parallel`` constructor.
Passing ``max_nbytes=None`` makes it possible to disable the automated array to
memmap conversion.
Manual management of memmapped input data
-----------------------------------------
For even finer tuning of the memory usage it is also possible to
dump the array as a memmap directly from the parent process to
free the memory before forking the worker processes. For instance
let's allocate a large array in the memory of the parent process::
>>> large_array = np.ones(int(1e6))
Dump it to a local file for memmapping::
>>> import tempfile
>>> import os
>>> from joblib import load, dump
>>> temp_folder = tempfile.mkdtemp()
>>> filename = os.path.join(temp_folder, 'joblib_test.mmap')
>>> if os.path.exists(filename): os.unlink(filename)
>>> _ = dump(large_array, filename)
>>> large_memmap = load(filename, mmap_mode='r+')
The ``large_memmap`` variable is pointing to a ``numpy.memmap``
instance::
>>> large_memmap.__class__.__name__, large_array.nbytes, large_array.shape
('memmap', 8000000, (1000000,))
>>> np.allclose(large_array, large_memmap)
True
The original array can be freed from the main process memory::
>>> del large_array
>>> import gc
>>> _ = gc.collect()
It is possible to slice ``large_memmap`` into a smaller memmap::
>>> small_memmap = large_memmap[2:5]
>>> small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape
('memmap', 24, (3,))
Finally a ``np.ndarray`` view backed on that same memory mapped file can be
used::
>>> small_array = np.asarray(small_memmap)
>>> small_array.__class__.__name__, small_array.nbytes, small_array.shape
('ndarray', 24, (3,))
All those three datastructures point to the same memory buffer and
this same buffer will also be reused directly by the worker processes
of a ``Parallel`` call::
>>> Parallel(n_jobs=2, max_nbytes=None)(
... delayed(is_memmap)(a)
... for a in [large_memmap, small_memmap, small_array])
[True, True, True]
Note that here ``max_nbytes=None`` is used to disable the auto-dumping
feature of ``Parallel``. ``small_array`` is still in shared memory in the
worker processes because it was already backed by shared memory in the
parent process.
The pickling machinery of ``Parallel`` multiprocessing queues are
able to detect this situation and optimize it on the fly to limit
the number of memory copies.
Writing parallel computation results in shared memory
-----------------------------------------------------
If data are opened using the ``w+`` or ``r+`` mode in the main program, the
worker will get ``r+`` mode access. Thus the worker will be able to write
its results directly to the original data, alleviating the need of the
serialization to send back the results to the parent process.
Here is an example script on parallel processing with preallocated
``numpy.memmap`` datastructures
:ref:`sphx_glr_auto_examples_parallel_memmap.py`.
.. warning::
Having concurrent workers write on overlapping shared memory data segments,
for instance by using inplace operators and assignments on a `numpy.memmap`
instance, can lead to data corruption as numpy does not offer atomic
operations. The previous example does not risk that issue as each task is
updating an exclusive segment of the shared result array.
Some C/C++ compilers offer lock-free atomic primitives such as add-and-fetch
or compare-and-swap that could be exposed to Python via CFFI_ for instance.
However providing numpy-aware atomic constructs is outside of the scope
of the joblib project.
.. _CFFI: https://cffi.readthedocs.org
A final note: don't forget to clean up any temporary folder when you are done
with the computation::
>>> import shutil
>>> try:
... shutil.rmtree(temp_folder)
... except OSError:
... pass # this can sometimes fail under Windows
|