File: parallel_numpy.rst

package info (click to toggle)
joblib 1.5.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,284 kB
  • sloc: python: 15,669; sh: 124; makefile: 39
file content (169 lines) | stat: -rw-r--r-- 6,212 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
..
    For doctests:

    >>> import sys
    >>> setup = getfixture('parallel_numpy_fixture')
    >>> fixture = setup(sys.modules[__name__])

By default the workers of the pool are real Python processes forked using the
``multiprocessing`` module of the Python standard library when ``n_jobs != 1``.
The arguments passed as input to the ``Parallel`` call are serialized and
reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated
``n_jobs`` times by the workers.

As this problem can often occur in scientific computing with ``numpy``
based datastructures, :class:`joblib.Parallel` provides a special
handling for large arrays to automatically dump them on the filesystem
and pass a reference to the worker to open them as memory map
on that file using the ``numpy.memmap`` subclass of ``numpy.ndarray``.
This makes it possible to share a segment of data between all the
worker processes.

.. note::

  The following only applies with the ``"loky"`` and
  ``'multiprocessing'`` process-backends. If your code can release the
  GIL, then using a thread-based backend by passing
  ``prefer='threads'`` is even more efficient because it makes it
  possible to avoid the communication overhead of process-based
  parallelism.

  Scientific Python libraries such as numpy, scipy, pandas and
  scikit-learn often release the GIL in performance critical code paths.
  It is therefore advised to always measure the speed of thread-based
  parallelism and use it when the scalability is not limited by the GIL.


Automated array to memmap conversion
------------------------------------

The automated array to memmap conversion is triggered by a configurable
threshold on the size of the array::

  >>> import numpy as np
  >>> from joblib import Parallel, delayed
  >>> def is_memmap(obj):
  ...     return isinstance(obj, np.memmap)

  >>> Parallel(n_jobs=2, max_nbytes=1e6)(
  ...     delayed(is_memmap)(np.ones(int(i)))
  ...     for i in [1e2, 1e4, 1e6])
  [False, False, True]

By default the data is dumped to the ``/dev/shm`` shared-memory partition if it
exists and is writable (typically the case under Linux). Otherwise the
operating system's temporary folder is used. The location of the temporary data
files can be customized by passing a ``temp_folder`` argument to the
``Parallel`` constructor.

Passing ``max_nbytes=None`` makes it possible to disable the automated array to
memmap conversion.


Manual management of memmapped input data
-----------------------------------------

For even finer tuning of the memory usage it is also possible to
dump the array as a memmap directly from the parent process to
free the memory before forking the worker processes. For instance
let's allocate a large array in the memory of the parent process::

  >>> large_array = np.ones(int(1e6))

Dump it to a local file for memmapping::

  >>> import tempfile
  >>> import os
  >>> from joblib import load, dump

  >>> temp_folder = tempfile.mkdtemp()
  >>> filename = os.path.join(temp_folder, 'joblib_test.mmap')
  >>> if os.path.exists(filename): os.unlink(filename)
  >>> _ = dump(large_array, filename)
  >>> large_memmap = load(filename, mmap_mode='r+')

The ``large_memmap`` variable is pointing to a ``numpy.memmap``
instance::

  >>> large_memmap.__class__.__name__, large_array.nbytes, large_array.shape
  ('memmap', 8000000, (1000000,))

  >>> np.allclose(large_array, large_memmap)
  True

The original array can be freed from the main process memory::

  >>> del large_array
  >>> import gc
  >>> _ = gc.collect()

It is possible to slice ``large_memmap`` into a smaller memmap::

  >>> small_memmap = large_memmap[2:5]
  >>> small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape
  ('memmap', 24, (3,))

Finally a ``np.ndarray`` view backed on that same memory mapped file can be
used::

  >>> small_array = np.asarray(small_memmap)
  >>> small_array.__class__.__name__, small_array.nbytes, small_array.shape
  ('ndarray', 24, (3,))

All those three datastructures point to the same memory buffer and
this same buffer will also be reused directly by the worker processes
of a ``Parallel`` call::

  >>> Parallel(n_jobs=2, max_nbytes=None)(
  ...     delayed(is_memmap)(a)
  ...     for a in [large_memmap, small_memmap, small_array])
  [True, True, True]

Note that here ``max_nbytes=None`` is used to disable the auto-dumping
feature of ``Parallel``. ``small_array`` is still in shared memory in the
worker processes because it was already backed by shared memory in the
parent process.
The pickling machinery of ``Parallel`` multiprocessing queues are
able to detect this situation and optimize it on the fly to limit
the number of memory copies.


Writing parallel computation results in shared memory
-----------------------------------------------------

If data are opened using the ``w+`` or ``r+`` mode in the main program, the
worker will get ``r+`` mode access. Thus the worker will be able to write
its results directly to the original data, alleviating the need of the
serialization to send back the results to the parent process.

Here is an example script on parallel processing with preallocated
``numpy.memmap`` datastructures
:ref:`sphx_glr_auto_examples_parallel_memmap.py`.

.. warning::

  Having concurrent workers write on overlapping shared memory data segments,
  for instance by using inplace operators and assignments on a `numpy.memmap`
  instance, can lead to data corruption as numpy does not offer atomic
  operations. The previous example does not risk that issue as each task is
  updating an exclusive segment of the shared result array.

  Some C/C++ compilers offer lock-free atomic primitives such as add-and-fetch
  or compare-and-swap that could be exposed to Python via CFFI_ for instance.
  However providing numpy-aware atomic constructs is outside of the scope
  of the joblib project.


.. _CFFI: https://cffi.readthedocs.org


A final note: don't forget to clean up any temporary folder when you are done
with the computation::

  >>> import shutil
  >>> try:
  ...     shutil.rmtree(temp_folder)
  ... except OSError:
  ...     pass  # this can sometimes fail under Windows