1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
#!/usr/bin/env python
"""
Parallel PI computation using Dynamic Process Management (DPM)
within Python objects exposing memory buffers (requires NumPy).
usage:
+ parent/child model::
$ mpiexec -n 1 python cpi-dpm.py [nchilds]
+ client/server model::
$ [xterm -e] mpiexec -n <nprocs> python cpi-dpm.py server [-v] &
$ [xterm -e] mpiexec -n 1 python cpi-dpm.py client [-v]
"""
import sys
from mpi4py import MPI
import numpy as N
def get_n():
prompt = "Enter the number of intervals: (0 quits) "
try:
n = int(input(prompt))
if n < 0: n = 0
except:
n = 0
return n
def view(pi, np=None, wt=None):
from math import pi as PI
prn = sys.stdout.write
if pi is not None:
prn("computed pi is: %.16f\n" % pi)
prn("absolute error: %.16f\n" % abs(pi - PI))
if np is not None:
prn("computing units: %d processes\n" % np)
if wt is not None:
prn("wall clock time: %g seconds\n" % wt)
sys.stdout.flush()
def comp_pi(n, comm, root=0):
nprocs = comm.Get_size()
myrank = comm.Get_rank()
n = N.array(n, 'i')
comm.Bcast([n, MPI.INT], root=root)
if n == 0: return 0.0
h = 1.0 / n;
s = 0.0;
for i in range(myrank, n, nprocs):
x = h * (i + 0.5);
s += 4.0 / (1.0 + x**2);
mypi = s * h
mypi = N.array(mypi, 'd')
pi = N.array(0, 'd')
comm.Reduce([mypi, MPI.DOUBLE],
[pi, MPI.DOUBLE],
root=root, op=MPI.SUM)
return pi
def master(icomm):
n = get_n()
wt = MPI.Wtime()
n = N.array(n, 'i')
icomm.Send([n, MPI.INT], dest=0)
pi = N.array(0, 'd')
icomm.Recv([pi, MPI.DOUBLE], source=0)
wt = MPI.Wtime() - wt
if n == 0: return
np = icomm.Get_remote_size()
view(pi, np, wt)
def worker(icomm):
myrank = icomm.Get_rank()
if myrank == 0:
source = dest = 0
else:
source = dest = MPI.PROC_NULL
n = N.array(0, 'i')
icomm.Recv([n, MPI.INT], source=source)
pi = comp_pi(n, comm=MPI.COMM_WORLD, root=0)
pi = N.array(pi, 'd')
icomm.Send([pi, MPI.DOUBLE], dest=dest)
# Parent/Child
def main_parent(nprocs=1):
assert nprocs > 0
assert MPI.COMM_WORLD.Get_size() == 1
icomm = MPI.COMM_WORLD.Spawn(command=sys.executable,
args=[__file__, 'child'],
maxprocs=nprocs)
master(icomm)
icomm.Disconnect()
def main_child():
icomm = MPI.Comm.Get_parent()
assert icomm != MPI.COMM_NULL
worker(icomm)
icomm.Disconnect()
# Client/Server
def main_server(COMM):
nprocs = COMM.Get_size()
myrank = COMM.Get_rank()
service, port, info = None, None, MPI.INFO_NULL
if myrank == 0:
port = MPI.Open_port(info)
log(COMM, "open port '%s'", port)
service = 'cpi'
MPI.Publish_name(service, port, info)
log(COMM, "service '%s' published.", service)
else:
port = ''
log(COMM, "waiting for client connection ...")
icomm = COMM.Accept(port, info, root=0)
log(COMM, "client connection accepted.")
worker(icomm)
log(COMM, "disconnecting from client ...")
icomm.Disconnect()
log(COMM, "client disconnected.")
if myrank == 0:
MPI.Unpublish_name(service, port, info)
log(COMM, "service '%s' unpublished", port)
MPI.Close_port(port)
log(COMM, "closed port '%s' ", port)
def main_client(COMM):
assert COMM.Get_size() == 1
service, info = 'cpi', MPI.INFO_NULL
port = MPI.Lookup_name(service, info)
log(COMM, "service '%s' found in port '%s'.", service, port)
log(COMM, "connecting to server ...")
icomm = COMM.Connect(port, info, root=0)
log(COMM, "server connected.")
master(icomm)
log(COMM, "disconnecting from server ...")
icomm.Disconnect()
log(COMM, "server disconnected.")
def main():
assert len(sys.argv) <= 2
if 'server' in sys.argv:
main_server(MPI.COMM_WORLD)
elif 'client' in sys.argv:
main_client(MPI.COMM_WORLD)
elif 'child' in sys.argv:
main_child()
else:
try: nchilds = int(sys.argv[1])
except: nchilds = 2
main_parent(nchilds)
VERBOSE = False
def log(COMM, fmt, *args):
if not VERBOSE: return
if COMM.rank != 0: return
sys.stdout.write(fmt % args)
sys.stdout.write('\n')
sys.stdout.flush()
if __name__ == '__main__':
if '-v' in sys.argv:
VERBOSE = True
sys.argv.remove('-v')
main()
|