File: nested_parallel_memory.py

package info (click to toggle)
joblib 1.4.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,360 kB
  • sloc: python: 14,772; sh: 138; makefile: 42
file content (140 lines) | stat: -rw-r--r-- 5,092 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
==================================================
Checkpoint using joblib.Memory and joblib.Parallel
==================================================

This example illustrates how to cache intermediate computing results using
:class:`joblib.Memory` within :class:`joblib.Parallel`.

"""

###############################################################################
# Embed caching within parallel processing
###############################################################################
#
# It is possible to cache a computationally expensive function executed during
# a parallel process. ``costly_compute`` emulates such time consuming function.

import time


def costly_compute(data, column):
    """Emulate a costly function by sleeping and returning a column."""
    time.sleep(2)
    return data[column]


def data_processing_mean(data, column):
    """Compute the mean of a column."""
    return costly_compute(data, column).mean()


###############################################################################
# Create some data. The random seed is fixed to generate deterministic data
# across Python session. Note that this is not necessary for this specific
# example since the memory cache is cleared at the end of the session.

import numpy as np
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)

###############################################################################
# It is first possible to make the processing without caching or parallel
# processing.

start = time.time()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.time()

print('\nSequential processing')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

###############################################################################
# ``costly_compute`` is expensive to compute and it is used as an intermediate
# step in ``data_processing_mean``. Therefore, it is interesting to store the
# intermediate results from ``costly_compute`` using :class:`joblib.Memory`.

from joblib import Memory

location = './cachedir'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)


###############################################################################
# Now, we define ``data_processing_mean_using_cache`` which benefits from the
# cache by calling ``costly_compute_cached``

def data_processing_mean_using_cache(data, column):
    """Compute the mean of a column."""
    return costly_compute_cached(data, column).mean()


###############################################################################
# Then, we execute the same processing in parallel and caching the intermediate
# results.

from joblib import Parallel, delayed

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

###############################################################################
# By using 2 workers, the parallel processing gives a x2 speed-up compared to
# the sequential case. By executing again the same process, the intermediate
# results obtained by calling ``costly_compute_cached`` will be loaded from the
# cache instead of executing the function.

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

###############################################################################
# Reuse intermediate checkpoints
###############################################################################
#
# Having cached the intermediate results of the ``costly_compute_cached``
# function, they are reusable by calling the function. We define a new
# processing which will take the maximum of the array returned by
# ``costly_compute_cached`` instead of previously the mean.


def data_processing_max_using_cache(data, column):
    """Compute the max of a column."""
    return costly_compute_cached(data, column).max()


start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_max_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nReusing intermediate checkpoints')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

###############################################################################
# The processing time only corresponds to the execution of the ``max``
# function. The internal call to ``costly_compute_cached`` is reloading the
# results from the cache.

###############################################################################
# Clean-up the cache folder
###############################################################################

memory.clear(warn=False)