File: mapper.py

package info (click to toggle)
python-bumps 1.0.0b2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 6,144 kB
  • sloc: python: 23,941; xml: 493; ansic: 373; makefile: 209; sh: 91; javascript: 90
file content (373 lines) | stat: -rw-r--r-- 12,809 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
Parallel and serial mapper implementations.

The API is a bit crufty since interprocess communication has evolved from
the original implementation. And the names are misleading.

Usage::

    Mapper.start_worker(problem)
    mapper = Mapper.start_mapper(problem, None, cpus)
    result = mapper(points)
    ...
    mapper = Mapper.start_mapper(problem, None, cpus)
    result = mapper(points)
    Mapper.stop_mapper()
"""

import sys
import os
import signal

# {{{ http://code.activestate.com/recipes/496767/ (r1)
# Converted to use ctypes by Paul Kienzle


PROCESS_ALL_ACCESS = 0x1F0FFF


def can_pickle(problem, check=False):
    """
    Returns True if *problem* can be pickled.

    If this method returns False then MPMapper cannot be used and
    SerialMapper should be used instead.

    If *check* is True then call *nllf()* on the duplicated object as a
    "smoke test" to verify that the function will run after copying. This
    is not foolproof. For example, access to a database may work in the
    duplicated object because the connection is open and available in the
    current process, but it will fail when trying to run on a remote machine.
    """
    try:
        import dill
    except ImportError:
        dill = None
        import pickle
    try:
        if dill is not None:
            dup = dill.loads(dill.dumps(problem, recurse=True))
        else:
            dup = pickle.loads(pickle.dumps(problem))
        if check:
            dup.nllf()
        return True
    except Exception:
        return False


def setpriority(pid=None, priority=1):
    """
    Set The Priority of a Windows Process.  Priority is a value between 0-5
    where 2 is normal priority and 5 is maximum.  Default sets the priority
    of the current python process but can take any valid process ID.
    """

    # import win32api,win32process,win32con
    from ctypes import windll

    priorityclasses = [
        0x40,  # IDLE_PRIORITY_CLASS,
        0x4000,  # BELOW_NORMAL_PRIORITY_CLASS,
        0x20,  # NORMAL_PRIORITY_CLASS,
        0x8000,  # ABOVE_NORMAL_PRIORITY_CLASS,
        0x80,  # HIGH_PRIORITY_CLASS,
        0x100,  # REALTIME_PRIORITY_CLASS
    ]
    if pid is None:
        pid = windll.kernel32.GetCurrentProcessId()
    handle = windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, True, pid)
    windll.kernel32.SetPriorityClass(handle, priorityclasses[priority])


# end of http://code.activestate.com/recipes/496767/ }}}


def nice():
    if os.name == "nt":
        setpriority(priority=1)
    else:
        os.nice(5)


# Noise so that the type checker is happy
class BaseMapper(object):
    has_problem = False

    @staticmethod
    def start_worker(problem):
        """Called with the problem to initialize the worker"""
        raise NotImplementedError()

    @staticmethod
    def start_mapper(problem, modelargs=None, cpus=0):
        """Called with the problem on a new fit."""
        raise NotImplementedError()

    # TODO: deprecate mapper parameter
    @staticmethod
    def stop_mapper(mapper=None):
        raise NotImplementedError()


class SerialMapper(BaseMapper):
    @staticmethod
    def start_worker(problem):
        pass

    @staticmethod
    def start_mapper(problem, modelargs=None, cpus=0):
        # Note: map is n iterator in python 3.x
        return lambda points: list(map(problem.nllf, points))

    @staticmethod
    def stop_mapper(mapper=None):
        pass


# Load the problem in the remote process rather than pickling
# def _MP_load_problem(*modelargs):
#    from .fitproblem import load_problem
#    _MP_set_problem(load_problem(*modelargs))


def _MP_setup():
    # Using MPMapper class variables to store worker globals.
    # It doesn't matter if they conflict with the controller values since
    # they are in a different process.
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    nice()


def _MP_run_problem(problem_point_tuple):
    problem_id, point, shared_pickled_problem = problem_point_tuple
    if problem_id != MPMapper.problem_id:
        # print(f"Fetching problem {problem_id} from namespace")
        # Problem is pickled using dill when it is available
        try:
            import dill

            MPMapper.problem = dill.loads(shared_pickled_problem[:].tobytes())
        except ImportError:
            import pickle

            MPMapper.problem = pickle.loads(shared_pickled_problem[:].tobytes())
        MPMapper.problem_id = problem_id
    return MPMapper.problem.nllf(point)


class MPMapper(BaseMapper):
    # Note: suprocesses are using the same variables
    pool = None
    manager = None
    problem_id = 0
    shared_pickled_problem = None
    problem = None

    @staticmethod
    def start_worker(problem):
        pass

    @staticmethod
    def start_mapper(problem, modelargs=None, cpus=0):
        import multiprocessing

        # Set up the process pool on the first call.
        if MPMapper.pool is None:
            # Create a sync namespace to distribute the problem description.
            MPMapper.manager = multiprocessing.Manager()
            # Start the process pool, sending the namespace handle
            if cpus == 0:
                cpus = multiprocessing.cpu_count()
            MPMapper.pool = multiprocessing.Pool(cpus, _MP_setup)

        # Increment the problem number and store the problem in the namespace.
        # The store action uses pickle to transfer python objects to the
        # manager process. Since this may fail for lambdas and for functions
        # defined within the model file, instead use dill (if available)
        # to pickle the problem before storing.
        MPMapper.problem_id += 1
        try:
            import dill

            MPMapper.pickled_problem = dill.dumps(problem, recurse=True)
        except ImportError:
            import pickle

            MPMapper.pickled_problem = pickle.dumps(problem)
        MPMapper.shared_pickled_problem = MPMapper.manager.Array("B", MPMapper.pickled_problem)

        # Set the mapper to send problem_id/point/shared_pickled_problem value triples
        def mapper(points):
            try:
                return MPMapper.pool.map(
                    _MP_run_problem, ((MPMapper.problem_id, p, MPMapper.shared_pickled_problem) for p in points)
                )
            except KeyboardInterrupt:
                MPMapper.stop_mapper()

        return mapper

    @staticmethod
    def stop_mapper(mapper=None):
        # reset pool and manager
        if MPMapper.pool is not None:
            MPMapper.pool.terminate()
            MPMapper.pool = None
            MPMapper.manager.shutdown()
            MPMapper.manager = None
        # Don't reset problem id; it keeps count even when mapper is restarted.
        ##MPMapper.problem_id = 0


def _MPI_set_problem(problem, comm, root=0):
    import dill

    pickled_problem = dill.dumps(problem, recurse=True) if comm.rank == root else None
    pickled_problem = comm.bcast(pickled_problem, root=root)
    return problem if comm.rank == root else dill.loads(pickled_problem)


def _MPI_map(problem, points, comm, root=0):
    import numpy as np
    from mpi4py import MPI

    # print(f"{comm.rank}: mapping points")

    # Send number of points and number of variables per point.
    # root: return result if there are points otherwise return False
    # worker: return True if there are points otherwise return False
    npoints, nvars = comm.bcast(points.shape if comm.rank == root else None, root=root)
    if npoints == 0:
        return None

    # Divvy points equally across all processes
    whole = points if comm.rank == root else None
    idx = np.arange(comm.size)
    size = np.ones(comm.size, idx.dtype) * (npoints // comm.size) + (idx < npoints % comm.size)
    offset = np.cumsum(np.hstack((0, size[:-1])))
    part = np.empty((size[comm.rank], nvars), dtype="d")
    comm.Scatterv((whole, (size * nvars, offset * nvars), MPI.DOUBLE), (part, MPI.DOUBLE), root=root)

    # Evaluate models assigned to each processor
    partial_result = np.array([problem.nllf(pk) for pk in part], dtype="d")

    # Collect results
    result = np.empty(npoints, dtype="d") if comm.rank == root else True
    comm.Barrier()
    comm.Gatherv((partial_result, MPI.DOUBLE), (result, (size, offset), MPI.DOUBLE), root=root)
    comm.Barrier()
    return result


def using_mpi():
    # Can look for environment variables defined by mpirun
    #   mpich: PMI_RANK, PMI_*, MPI_*
    #   openmp: OMPI_COMM_WORLD_RANK, OMPI_* PMIX_*
    #   impi_rt (intel): PMI_RANK I_MPI_* HYDRA_*
    #   msmpi (microsoft): PMI_RANK PMI_* MSMPI_*
    # TODO: Make sure that Slurm isn't setting MPI variables in the batch script.
    # I seem to recall some messiness with pmix variants on our local openmpi cluster
    # so I would rather look at OMP_COMM_WORLD_RANK
    mpienv = [
        "OMPI_COMM_WORLD_RANK",  # OpenMPI
        "PMI_RANK",  # MPICH, MSMPI [Microsoft], IMPI_RT [Intel]
    ]
    return any(v in os.environ for v in mpienv)

    # The robust solution is as follows, but it requires opening the MPI ports.
    # This triggers a security box on the Mac asking to give the python interpreter
    # access to these ports. Since MPI use on Mac will be rare (only for problems
    # that can't be pickled) we should avoid the confusing security box.
    # from mpi4py import MPI
    # try:
    #     comm = MPI.COMM_WORLD
    #     return comm.size > 1
    # finally:
    #     return False


class MPIMapper(BaseMapper):
    has_problem = True
    """For MPIMapper only the worker is initialized with the fit problem."""

    @staticmethod
    def start_worker(problem):
        """
        Start the worker process.

        For the main process this does nothing and returns immediately. The
        worker processes never return.

        Each worker sits in a loop waiting for the next batch of points
        for the problem, or for the next problem. Set t
        problem is set to None, then exit the process and never
        """
        from mpi4py import MPI

        comm, root = MPI.COMM_WORLD, 0
        MPIMapper.rank = comm.rank
        rank = comm.rank
        # print(f"MPI {rank} of {comm.size} initializing")

        # If worker, sit in a loop waiting for the next point.
        # If the point is empty, then wait for a new problem.
        # If the problem is None then we are done, otherwise wait for next point.
        if rank != root:
            # print(f"{rank}: looping")
            while True:
                result = _MPI_map(problem, None, comm, root)
                if result is None:
                    problem = _MPI_set_problem(None, comm, root)
                    if problem is None:
                        break
                    # print(f"{rank}: changing problem")

            # print(f"{rank}: finalizing")
            MPI.Finalize()

            # Exit the program after the worker is done. Don't return
            # to the caller since that is continuing on with the main
            # thread, and in particular, attempting to rerun the fit on
            # each worker.
            # print(f"{rank}: Worker exiting")
            sys.exit(0)
            # print(f"{rank}: Worker exited")
        else:
            # Root initialization:
            MPIMapper.has_problem = problem is not None
            # print("mapper has problem", MPIMapper.has_problem)

    @staticmethod
    def start_mapper(problem, modelargs=None, cpus=0):
        # Only root can get here---worker is stuck in start_worker
        from mpi4py import MPI
        import numpy as np

        comm, root = MPI.COMM_WORLD, 0

        # Signal new problem then send it, but not on the first fit. We do this
        # so that we can still run MPI fits even if the problem itself cannot
        # be pickled, but only the first one. (You can still fit a series even
        # if the problem can't be pickled, but you will need to restart the
        # MPI job separately for each fit.)
        # Note: setting problem to None stops the program, so call finalize().
        def mapper(points):
            return _MPI_map(problem, points, comm, root)

        if not MPIMapper.has_problem:  # Only true on the first fit
            # print(f"*** {comm.rank}: replacing problem")
            # Send an empty set of points to signal a new problem is coming.
            mapper(np.empty((0, 0), "d"))
            _MPI_set_problem(problem, comm, root)
            if problem is None:
                # print(f"{comm.rank}: finalizing root")
                MPI.Finalize()
        MPIMapper.has_problem = False
        return mapper

    @staticmethod
    def stop_mapper(mapper=None):
        # print("stopping mapper")
        # Set problem=None to stop the program.
        MPIMapper.start_mapper(None, None)