File: __init__.py

package info (click to toggle)
onetbb 2022.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,444 kB
  • sloc: cpp: 129,228; ansic: 9,745; python: 808; xml: 183; objc: 176; makefile: 66; sh: 66; awk: 41; javascript: 37
file content (331 lines) | stat: -rw-r--r-- 13,120 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# Copyright (c) 2016-2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing.pool
import ctypes
import atexit
import sys
import os

import platform

if "Windows" in platform.system():
    import site
    path_to_env = site.getsitepackages()[0]
    path_to_libs = os.path.join(path_to_env, "Library", "bin")
    if sys.version_info.minor >= 8:
        os.add_dll_directory(path_to_libs)
    os.environ['PATH'] += os.pathsep + path_to_libs


from .api import  *
from .api import __all__ as api__all
from .pool import *
from .pool import __all__ as pool__all

__all__ = ["Monkey", "is_active"] + api__all + pool__all

__doc__ = """
Python API for Intel(R) oneAPI Threading Building Blocks (oneTBB)
extended with standard Python's pools implementation and monkey-patching.

Command-line interface example:
$  python3 -m tbb $your_script.py
Runs your_script.py in context of tbb.Monkey
"""

is_active = False
""" Indicates whether oneTBB context is activated """

ipc_enabled = False
""" Indicates whether IPC mode is enabled """

libirml = "libirml.so.1"


def _test(arg=None):
    """Some tests"""
    import platform
    if platform.system() == "Linux":
        ctypes.CDLL(libirml)
        assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'") # nosec
    from .test import test
    test(arg)
    print("done")


def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
                            maxtasks=None):
    from multiprocessing.pool import worker
    worker(inqueue, outqueue, initializer, initargs, maxtasks)
    if ipc_enabled:
        try:
            librml = ctypes.CDLL(libirml)
            librml.release_resources()
        except:
            print("Warning: Can not load ", libirml, file=sys.stderr)


class TBBProcessPool27(multiprocessing.pool.Pool):
    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        from multiprocessing.util import debug

        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=tbb_process_pool_worker27,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

    def __del__(self):
        self.close()
        for p in self._pool:
            p.join()

    def __exit__(self, *args):
        self.close()
        for p in self._pool:
            p.join()


def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
                            maxtasks=None, wrap_exception=False):
    from multiprocessing.pool import worker
    worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
    if ipc_enabled:
        try:
            librml = ctypes.CDLL(libirml)
            librml.release_resources()
        except:
            print("Warning: Can not load ", libirml, file=sys.stderr)


class TBBProcessPool3(multiprocessing.pool.Pool):
    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        from multiprocessing.util import debug

        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=tbb_process_pool_worker3,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild,
                                   self._wrap_exception)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

    def __del__(self):
        self.close()
        for p in self._pool:
            p.join()

    def __exit__(self, *args):
        self.close()
        for p in self._pool:
            p.join()


class Monkey:
    """
    Context manager which replaces standard multiprocessing.pool
    implementations with tbb.pool using monkey-patching. It also enables oneTBB
    threading for Intel(R) oneAPI Math Kernel Library (oneMKL). For example:

        with tbb.Monkey():
            run_my_numpy_code()

    It allows multiple parallel tasks to be executed on the same thread pool
    and coordinate number of threads across multiple processes thus avoiding
    overheads from oversubscription.
    """
    _items   = {}
    _modules = {}

    def __init__(self, max_num_threads=None, benchmark=False):
        """
        Create context manager for running under TBB scheduler.
        :param max_num_threads: if specified, limits maximal number of threads
        :param benchmark: if specified, blocks in initialization until requested number of threads are ready
        """
        if max_num_threads:
            self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
        if benchmark:
            if not max_num_threads:
               max_num_threads = default_num_threads()
            from .api import _concurrency_barrier
            _concurrency_barrier(int(max_num_threads))

    def _patch(self, class_name, module_name, obj):
        m = self._modules[class_name] = __import__(module_name, globals(),
                                                   locals(), [class_name])
        if m == None:
            return
        oldattr = getattr(m, class_name, None)
        if oldattr == None:
            self._modules[class_name] = None
            return
        self._items[class_name] = oldattr
        setattr(m, class_name, obj)

    def __enter__(self):
        global is_active
        assert is_active == False, "tbb.Monkey does not support nesting yet"
        is_active = True
        self.env_mkl = os.getenv('MKL_THREADING_LAYER')
        os.environ['MKL_THREADING_LAYER'] = 'TBB'
        self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
        os.environ['NUMBA_THREADING_LAYER'] = 'TBB'

        if ipc_enabled:
            if sys.version_info.major == 2 and sys.version_info.minor >= 7:
                self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
            elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
                self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
        self._patch("ThreadPool", "multiprocessing.pool", Pool)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        global is_active
        assert is_active == True, "modified?"
        is_active = False
        if self.env_mkl is None:
            del os.environ['MKL_THREADING_LAYER']
        else:
            os.environ['MKL_THREADING_LAYER'] = self.env_mkl
        if self.env_numba is None:
            del os.environ['NUMBA_THREADING_LAYER']
        else:
            os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
        for name in self._items.keys():
            setattr(self._modules[name], name, self._items[name])


def init_sem_name():
    try:
        librml = ctypes.CDLL(libirml)
        librml.set_active_sem_name()
        librml.set_stop_sem_name()
    except Exception as e:
        print("Warning: Can not initialize name of shared semaphores:", e,
              file=sys.stderr)


def tbb_atexit():
    if ipc_enabled:
        try:
            librml = ctypes.CDLL(libirml)
            librml.release_semaphores()
        except:
            print("Warning: Can not release shared semaphores",
                  file=sys.stderr)


def _main():
    # Run the module specified as the next command line argument
    # python3 -m TBB user_app.py
    global ipc_enabled

    import platform
    import argparse
    parser = argparse.ArgumentParser(prog="python3 -m tbb", description="""
                Run your Python script in context of tbb.Monkey, which
                replaces standard Python pools and threading layer of
                Intel(R) oneAPI Math Kernel Library (oneMKL) by implementation based on
                Intel(R) oneAPI Threading Building Blocks (oneTBB). It enables multiple parallel
                tasks to be executed on the same thread pool and coordinate
                number of threads across multiple processes thus avoiding
                overheads from oversubscription.
             """, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    if platform.system() == "Linux":
        parser.add_argument('--ipc', action='store_true',
                        help="Enable inter-process (IPC) coordination between oneTBB schedulers")
        parser.add_argument('-a', '--allocator', action='store_true',
                        help="Enable oneTBB scalable allocator as a replacement for standard memory allocator")
        parser.add_argument('--allocator-huge-pages', action='store_true',
                        help="Enable huge pages for oneTBB allocator (implies: -a)")
    parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
                        help="Initialize oneTBB with P max number of threads per process", metavar='P')
    parser.add_argument('-b', '--benchmark', action='store_true',
                        help="Block oneTBB initialization until all the threads are created before continue the script. "
                        "This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
    parser.add_argument('-v', '--verbose', action='store_true',
                        help="Request verbose and version information")
    parser.add_argument('-m', action='store_true', dest='module',
                        help="Executes following as a module")
    parser.add_argument('name', help="Script or module name")
    parser.add_argument('args', nargs=argparse.REMAINDER,
                        help="Command line arguments")
    args = parser.parse_args()

    if args.verbose:
        os.environ["TBB_VERSION"] = "1"
    if platform.system() == "Linux":
        if args.allocator_huge_pages:
            args.allocator = True
        if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
            libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
            ld_preload = 'LD_PRELOAD'
            os.environ["_TBB_MALLOC_PRELOAD"] = "1"
            preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
            if libtbbmalloc_lib in preload_list:
                print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
            else:
                os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))

            if args.allocator_huge_pages:
                assert platform.system() == "Linux"
                try:
                    with open('/proc/sys/vm/nr_hugepages', 'r') as f:
                        pages = int(f.read())
                    if pages == 0:
                        print("oneTBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
                              "\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
                    os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
                except:
                    print("oneTBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
                          "\tIs the Linux kernel configured with the huge pages feature?")
                    sys.exit(1)

            os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
            assert False, "Re-execution failed"

    sys.argv = [args.name] + args.args
    ipc_enabled = platform.system() == "Linux" and args.ipc
    os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
    if ipc_enabled:
        atexit.register(tbb_atexit)
        init_sem_name()
    if not os.environ.get("KMP_BLOCKTIME"): # TODO move
        os.environ["KMP_BLOCKTIME"] = "0"
    if '_' + args.name in globals():
        return globals()['_' + args.name](*args.args)
    else:
        import runpy
        runf = runpy.run_module if args.module else runpy.run_path
        with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
            runf(args.name, run_name='__main__')