1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helpers related to multiprocessing.
Based on: //tools/binary_size/libsupersize/parallel.py
"""
import atexit
import logging
import multiprocessing
import os
import sys
import threading
import traceback
DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
if DISABLE_ASYNC:
logging.warning('Running in synchronous mode.')
_all_pools = None
_is_child_process = False
_silence_exceptions = False
# Used to pass parameters to forked processes without pickling.
_fork_params = None
_fork_kwargs = None
# Ensure fork is used on MacOS for multiprocessing compatibility.
# Starting from Python 3.8, the "spawn" method is the default on MacOS.
# On Linux hosts this line will be a no-op.
multiprocessing.set_start_method('fork')
class _ImmediateResult:
def __init__(self, value):
self._value = value
def get(self):
return self._value
def wait(self):
pass
def ready(self):
return True
def successful(self):
return True
class _ExceptionWrapper:
"""Used to marshal exception messages back to main process."""
def __init__(self, msg, exception_type=None):
self.msg = msg
self.exception_type = exception_type
def MaybeThrow(self):
if self.exception_type:
raise getattr(__builtins__,
self.exception_type)('Originally caused by: ' + self.msg)
class _FuncWrapper:
"""Runs on the fork()'ed side to catch exceptions and spread *args."""
def __init__(self, func):
global _is_child_process
_is_child_process = True
self._func = func
def __call__(self, index, _=None):
global _fork_kwargs
try:
if _fork_kwargs is None: # Clarifies _fork_kwargs is map for pylint.
_fork_kwargs = {}
params = _fork_params[index] # pylint: disable=unsubscriptable-object
return self._func(*params, **_fork_kwargs)
except Exception as e:
# Only keep the exception type for builtin exception types or else risk
# further marshalling exceptions.
exception_type = None
if hasattr(__builtins__, type(e).__name__):
exception_type = type(e).__name__
# multiprocessing is supposed to catch and return exceptions automatically
# but it doesn't seem to work properly :(.
return _ExceptionWrapper(traceback.format_exc(), exception_type)
except BaseException:
return _ExceptionWrapper(traceback.format_exc())
class _WrappedResult:
"""Allows for host-side logic to be run after child process has terminated.
* Unregisters associated pool _all_pools.
* Raises exception caught by _FuncWrapper.
"""
def __init__(self, result, pool=None):
self._result = result
self._pool = pool
def get(self):
self.wait()
value = self._result.get()
_CheckForException(value)
return value
def wait(self):
self._result.wait()
if self._pool:
_all_pools.remove(self._pool)
self._pool = None
def ready(self):
return self._result.ready()
def successful(self):
return self._result.successful()
def _TerminatePools():
"""Calls .terminate() on all active process pools.
Not supposed to be necessary according to the docs, but seems to be required
when child process throws an exception or Ctrl-C is hit.
"""
global _silence_exceptions
_silence_exceptions = True
# Child processes cannot have pools, but atexit runs this function because
# it was registered before fork()ing.
if _is_child_process:
return
def close_pool(pool):
try:
pool.terminate()
except BaseException:
pass
for i, pool in enumerate(_all_pools):
# Without calling terminate() on a separate thread, the call can block
# forever.
thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
target=close_pool,
args=(pool, ))
thread.daemon = True
thread.start()
def _CheckForException(value):
if isinstance(value, _ExceptionWrapper):
global _silence_exceptions
if not _silence_exceptions:
value.MaybeThrow()
_silence_exceptions = True
logging.error('Subprocess raised an exception:\n%s', value.msg)
sys.exit(1)
def _MakeProcessPool(job_params, **job_kwargs):
global _all_pools
global _fork_params
global _fork_kwargs
assert _fork_params is None
assert _fork_kwargs is None
pool_size = min(len(job_params), multiprocessing.cpu_count())
_fork_params = job_params
_fork_kwargs = job_kwargs
ret = multiprocessing.Pool(pool_size)
_fork_params = None
_fork_kwargs = None
if _all_pools is None:
_all_pools = []
atexit.register(_TerminatePools)
_all_pools.append(ret)
return ret
def ForkAndCall(func, args):
"""Runs |func| in a fork'ed process.
Returns:
A Result object (call .get() to get the return value)
"""
if DISABLE_ASYNC:
pool = None
result = _ImmediateResult(func(*args))
else:
pool = _MakeProcessPool([args]) # Omit |kwargs|.
result = pool.apply_async(_FuncWrapper(func), (0, ))
pool.close()
return _WrappedResult(result, pool=pool)
def BulkForkAndCall(func, arg_tuples, **kwargs):
"""Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
Args:
kwargs: Common keyword arguments to be passed to |func|.
Yields the return values in order.
"""
arg_tuples = list(arg_tuples)
if not arg_tuples:
return
if DISABLE_ASYNC:
for args in arg_tuples:
yield func(*args, **kwargs)
return
pool = _MakeProcessPool(arg_tuples, **kwargs)
wrapped_func = _FuncWrapper(func)
try:
for result in pool.imap(wrapped_func, range(len(arg_tuples))):
_CheckForException(result)
yield result
finally:
pool.close()
pool.join()
_all_pools.remove(pool)
|