File: multiprocess.py

package info (click to toggle)
displaycal-py3 3.9.16-1
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 29,120 kB
  • sloc: python: 115,777; javascript: 11,540; xml: 598; sh: 257; makefile: 173
file content (352 lines) | stat: -rw-r--r-- 10,273 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# -*- coding: utf-8 -*-

from queue import Empty
import atexit
import errno
import logging
import math
import multiprocessing as mp
import multiprocessing.pool
import sys
import threading


def cpu_count(limit_by_total_vmem=True):
    """Returns the number of CPUs in the system

    If psutil is installed, the number of reported CPUs is limited according to
    total RAM by assuming 1 GB for each CPU + 1 GB for the system, unless
    limit_by_total_vmem is False, to allow a reasonable amount of memory for
    each CPU.

    Return fallback value of 1 if CPU count cannot be determined.

    """
    max_cpus = sys.maxsize
    if limit_by_total_vmem:
        try:
            import psutil
        except (ImportError, RuntimeError):
            pass
        else:
            # Limit reported CPUs according to total RAM.
            # We use total instead of available because we assume the system is
            # smart enough to swap memory used by inactive processes to disk to
            # free up more physical RAM for active processes.
            try:
                max_cpus = int(psutil.virtual_memory().total / (1024**3) - 1)
            except Exception:
                pass
    try:
        return max(min(mp.cpu_count(), max_cpus), 1)
    except Exception:
        return 1


def pool_slice(
    func,
    data_in,
    args=None,
    kwds=None,
    num_workers=None,
    thread_abort=None,
    logfile=None,
    num_batches=1,
    progress=0,
):
    """Process data in slices using a pool of workers and return the results.

    The individual worker results are returned in the same order as the
    original input data, irrespective of the order in which the workers
    finished (FIFO).

    Progress percentage is written to optional logfile using a background
    thread that monitors a queue.
    Note that 'func' is supposed to periodically check thread_abort.event
    which is passed as the first argument to 'func', and put its progress
    percentage into the queue which is passed as the second argument to 'func'.

    """
    if args is None:
        args = ()

    if kwds is None:
        kwds = {}

    from DisplayCAL.config import getcfg

    if num_workers is None:
        num_workers = cpu_count()
    num_workers = max(min(int(num_workers), len(data_in)), 1)
    max_workers = getcfg("multiprocessing.max_cpus")
    if max_workers:
        num_workers = min(num_workers, max_workers)

    if num_workers == 1 or not num_batches:
        # Splitting the workload into batches only makes sense if there are
        # multiple workers
        num_batches = 1

    chunksize = float(len(data_in)) / (num_workers * num_batches)
    if chunksize < 1:
        num_batches = 1
        chunksize = float(len(data_in)) / num_workers

    if num_workers > 1:
        Pool = NonDaemonicPool
        manager = mp.Manager()
        if thread_abort is not None and not isinstance(
            thread_abort.event, mp.managers.EventProxy
        ):
            # Replace the event with a managed instance that is compatible
            # with pool
            event = thread_abort.event
            thread_abort.event = manager.Event()
            if event.is_set():
                thread_abort.event.set()
        else:
            event = None
        Queue = manager.Queue
    else:
        # Do it all in in the main thread of the current instance
        Pool = FakePool
        manager = None
        Queue = FakeQueue

    if thread_abort is not None:
        thread_abort_event = thread_abort.event
    else:
        thread_abort_event = None

    progress_queue = Queue()

    if logfile:

        def progress_logger(num_workers, progress=0.0):
            eof_count = 0
            prevperc = -1
            while progress < 100 * num_workers:
                try:
                    inc = progress_queue.get(True, 0.1)
                    if isinstance(inc, Exception):
                        raise inc
                    progress += inc
                except Empty:
                    continue
                except IOError:
                    break
                except EOFError:
                    eof_count += 1
                    if eof_count == num_workers:
                        break
                perc = round(progress / num_workers)
                if perc > prevperc:
                    logfile.write("\r%i%%" % perc)
                    prevperc = perc

        threading.Thread(
            target=progress_logger,
            args=(num_workers * num_batches, progress * num_workers * num_batches),
            name="ProcessProgressLogger",
            group=None,
        ).start()

    pool = Pool(num_workers)
    results = []
    start = 0
    for batch in range(num_batches):
        for i in range(batch * num_workers, (batch + 1) * num_workers):
            end = int(math.ceil(chunksize * (i + 1)))
            results.append(
                pool.apply_async(
                    WorkerFunc(func, batch == num_batches - 1),
                    (data_in[start:end], thread_abort_event, progress_queue) + args,
                    kwds,
                )
            )
            start = end

    # Get results
    exception = None
    data_out = []
    for result in results:
        result = result.get()
        if isinstance(result, Exception):
            exception = result
            continue
        data_out.append(result)

    pool.close()
    pool.join()

    if manager:
        # Need to shutdown manager so it doesn't hold files in use
        if event:
            # Restore original event
            if thread_abort.event.is_set():
                event.set()
            thread_abort.event = event
        manager.shutdown()

    if exception:
        raise exception

    return data_out


class WorkerFunc:
    def __init__(self, func, exit=False):
        self.func = func
        self.exit = exit

    def __call__(self, data, thread_abort_event, progress_queue, *args, **kwds):
        try:
            return self.func(data, thread_abort_event, progress_queue, *args, **kwds)
        except Exception as exception:
            if (
                not getattr(sys, "_sigbreak", False)
                or not isinstance(exception, IOError)
                or exception.args[0] != errno.EPIPE
            ):
                import traceback

                print(traceback.format_exc())
            return exception
        finally:
            progress_queue.put(EOFError())
            if mp.current_process().name != "MainProcess":
                print("Exiting worker process", mp.current_process().name)
                if sys.platform == "win32" and self.exit:
                    # Exit handlers registered with atexit will not normally
                    # run when a multiprocessing subprocess exits. We are only
                    # interested in our own exit handler though.
                    # Note all of this only applies to Windows, as it doesn't
                    # have fork().

                    # This is not working with Ptyhon 3 as atexit is reimplemented in C
                    # and atexit._exithandlers are not available.
                    # for func, targs, kargs in atexit._exithandlers:
                    #     # Find our lockfile removal exit handler
                    #     if (
                    #         targs
                    #         and isinstance(targs[0], str)
                    #         and targs[0].endswith(".lock")
                    #     ):
                    #         print("Removing lockfile", targs[0])
                    #         try:
                    #             func(*targs, **kargs)
                    #         except Exception as exception:
                    #             print("Could not remove lockfile:", exception)

                    # Logging is normally shutdown by atexit, as well. Do
                    # it explicitly instead.
                    logging.shutdown()


class Mapper:
    """Wrap 'func' with optional arguments.

    To be used as function argument for Pool.map

    """

    def __init__(self, func, *args, **kwds):
        self.func = WorkerFunc(func)
        self.args = args
        self.kwds = kwds

    def __call__(self, iterable):
        return self.func(iterable, *self.args, **self.kwds)


class NonDaemonicProcess(mp.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, daemonic):
        return


class NonDaemonicPool(mp.pool.Pool):
    """Pool that has non-daemonic workers"""

    def Process(self, *args, **kwargs):
        # Process is a function after Python 3.7+
        # Process = NonDaemonicProcess -- This will not work with Python3.7+
        proc = super(NonDaemonicPool, self).Process(*args, **kwargs)
        proc.__class__ = NonDaemonicProcess  # TODO: This is not cool, find a better way
        #                                            of doing it.
        return proc


class FakeManager:
    """Fake manager"""

    def Queue(self):
        return FakeQueue()

    def Value(self, typecode, *args, **kwds):
        return mp.managers.Value(typecode, *args, **kwds)

    def shutdown(self):
        pass


class FakePool:
    """Fake pool."""

    def __init__(
        self, processes=None, initializer=None, initargs=(), maxtasksperchild=None
    ):
        pass

    def apply_async(self, func, args, kwds):
        return Result(func(*args, **kwds))

    def close(self):
        pass

    def join(self):
        pass

    def map(self, func, iterable, chunksize=None):
        return func(iterable)

    def terminate(self):
        pass


class FakeQueue:
    """Fake queue."""

    def __init__(self):
        self.queue = []

    def get(self, block=True, timeout=None):
        try:
            return self.queue.pop()
        except Exception:
            raise Empty

    def join(self):
        pass

    def put(self, item, block=True, timeout=None):
        self.queue.append(item)


class Result:
    """Result proxy."""

    def __init__(self, result):
        self.result = result

    def get(self):
        """Return result.

        Returns:
            WorkerFunc: WorkerFunc instance as the result.
        """
        return self.result