File: multi_proc.py

package info (click to toggle)
pybdsf 1.13.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 22,188 kB
  • sloc: fortran: 40,850; python: 14,895; ansic: 4,347; cpp: 1,586; makefile: 131; sh: 46
file content (240 lines) | stat: -rw-r--r-- 8,084 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""Multiprocessing module to handle parallelization.

This module can optionally update a statusbar and can divide tasks
between cores using weights (so that each core gets a set of tasks with
the same total weight).

Adapted from a module by Brian Refsdal at SAO, available at AstroPython
(http://www.astropython.org/snippet/2010/3/Parallel-map-using-multiprocessing).

"""
from __future__ import print_function
import traceback
import sys
import os
import numpy
import multiprocessing
_ncpus = 1

# Get the number of available cores. We use os.sched_getaffinity() for this if
# possible, as the number of available cores may be less than the total number
# of CPU cores in the machine, which is returned by, e.g.,
# multiprocessing.cpu_count()
#
# Note: since macOS (Darwin) does not support os.sched_getaffinity(), we use
# multiprocessing.cpu_count() instead
if sys.platform == 'darwin':
    if sys.version_info[0] == 3 and sys.version_info[1] >= 8:
        # We need to set spawn method to "fork" for macOS on Python 3.8+ where
        # the default has been changed to "spawn", causing problems (see the
        # discussion at https://github.com/ipython/ipython/issues/12396)
        multiprocessing.set_start_method('fork')
    _ncpus = multiprocessing.cpu_count()
else:
    multiprocessing.set_start_method('fork')
    _ncpus = len(os.sched_getaffinity(0))


__all__ = ('parallel_map',)


def worker(f, ii, chunk, out_q, err_q, lock, bar, bar_state):
    """
    A worker function that maps an input function over a
    slice of the input iterable.

    :param f  : callable function that accepts argument from iterable
    :param ii  : process ID
    :param chunk: slice of input iterable
    :param out_q: thread-safe output queue
    :param err_q: thread-safe queue to populate on exception
    :param lock : thread-safe lock to protect a resource
           ( useful in extending parallel_map() )
    :param bar: statusbar to update during fit
    :param bar_state: statusbar state dictionary
    """
    vals = []

    # iterate over slice
    for val in chunk:
        try:
            result = f(val)
        except Exception as e:
            etype, val, tbk = sys.exc_info()
            print('Thread raised exception', e)
            print('Traceback of thread is:')
            print('-------------------------')
            traceback.print_tb(tbk)
            print('-------------------------')
            err_q.put(e)
            return

        vals.append(result)

        # update statusbar
        if bar is not None:
            if bar_state['started']:
                bar.pos = bar_state['pos']
                bar.spin_pos = bar_state['spin_pos']
                bar.started = bar_state['started']
                increment = bar.increment()
                bar_state['started'] = bar.started
                bar_state['pos'] += increment
                bar_state['spin_pos'] += increment
                if bar_state['spin_pos'] >= 4:
                    bar_state['spin_pos'] = 0

    # output the result and task ID to output queue
    out_q.put((ii, vals))


def run_tasks(procs, err_q, out_q, num):
    """
    A function that executes populated processes and processes
    the resultant array. Checks error queue for any exceptions.

    :param procs: list of Process objects
    :param out_q: thread-safe output queue
    :param err_q: thread-safe queue to populate on exception
    :param num : length of resultant array

    """
    # function to terminate processes that are still running.
    die = (lambda vals: [val.terminate() for val in vals
                         if val.exitcode is None])

    try:
        for proc in procs:
            proc.start()

        for proc in procs:
            proc.join()

    except Exception as e:
        # kill all slave processes on ctrl-C
        die(procs)
        raise e

    if not err_q.empty():
        # kill all on any exception from any one slave
        die(procs)
        raise err_q.get()

    # Processes finish in arbitrary order. Process IDs double
    # as index in the resultant array.
    results = [None] * num
    for i in range(num):
        idx, result = out_q.get()
        results[idx] = result

    # Remove extra dimension added by array_split
    result_list = []
    for result in results:
        result_list += result

    return result_list


def parallel_map(function, sequence, numcores=None, bar=None, weights=None):
    """
    A parallelized version of the native Python map function that
    utilizes the Python multiprocessing module to divide and
    conquer a sequence.

    parallel_map does not yet support multiple argument sequences.

    :param function: callable function that accepts argument from iterable
    :param sequence: iterable sequence
    :param numcores: number of cores to use (if None, all are used)
    :param bar: statusbar to update during fit
    :param weights: weights to use when splitting the sequence

    """
    if not callable(function):
        raise TypeError("input function '%s' is not callable" %
                        repr(function))

    if not numpy.iterable(sequence):
        raise TypeError("input '%s' is not iterable" %
                        repr(sequence))

    sequence = numpy.array(list(sequence), dtype=object)
    size = len(sequence)

    if size == 1:
        results = list(map(function, sequence))
        if bar is not None:
            bar.stop()
        return results

    # Set number of cores to use. Try to leave one core free for pyplot.
    if numcores is None:
        numcores = _ncpus - 1
    if numcores > _ncpus - 1:
        numcores = _ncpus - 1
    if numcores < 1:
        numcores = 1

    # Returns a started SyncManager object which can be used for sharing
    # objects between processes. The returned manager object corresponds
    # to a spawned child process and has methods which will create shared
    # objects and return corresponding proxies.
    manager = multiprocessing.Manager()

    # Create FIFO queue and lock shared objects and return proxies to them.
    # The managers handles a server process that manages shared objects that
    # each slave process has access to. Bottom line -- thread-safe.
    out_q = manager.Queue()
    err_q = manager.Queue()
    lock = manager.Lock()
    bar_state = manager.dict()
    if bar is not None:
        bar_state['pos'] = bar.pos
        bar_state['spin_pos'] = bar.spin_pos
        bar_state['started'] = bar.started

    # if sequence is less than numcores, only use len sequence number of
    # processes
    if size < numcores:
        numcores = size

    # group sequence into numcores-worth of chunks
    if weights is None or numcores == size:
        # No grouping specified (or there are as many cores as
        # processes), so divide into equal chunks
        sequence = numpy.array_split(sequence, numcores)
    else:
        # Group so that each group has roughly an equal sum of weights
        weight_per_core = numpy.sum(weights)/float(numcores)
        cut_values = []
        temp_sum = 0.0
        for indx, weight in enumerate(weights):
            temp_sum += weight
            if temp_sum > weight_per_core:
                cut_values.append(indx+1)
                temp_sum = weight
        if len(cut_values) > numcores - 1:
            cut_values = cut_values[0:numcores-1]
        sequence = numpy.array_split(sequence, cut_values)

    # Make sure there are no empty chunks at the end of the sequence
    while len(sequence[-1]) == 0:
        sequence.pop()

    procs = [multiprocessing.Process(target=worker,
             args=(function, ii, chunk, out_q, err_q, lock, bar, bar_state))
             for ii, chunk in enumerate(sequence)]

    try:
        results = run_tasks(procs, err_q, out_q, len(sequence))
        if bar is not None:
            if bar.started:
                bar.stop()
        return results

    except KeyboardInterrupt:
        for proc in procs:
            if proc.exitcode is None:
                proc.terminate()
                proc.join()
        raise