1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
# http://mvapich.cse.ohio-state.edu/benchmarks/
from mpi4py import MPI
def osu_gather(
BENCHMARH = "MPI Gather Latency Test",
skip = 1000,
loop = 10000,
skip_large = 10,
loop_large = 100,
large_message_size = 8192,
MAX_MSG_SIZE = 1<<20,
):
comm = MPI.COMM_WORLD
myid = comm.Get_rank()
numprocs = comm.Get_size()
if numprocs < 2:
if myid == 0:
errmsg = "This test requires at least two processes"
else:
errmsg = None
raise SystemExit(errmsg)
if myid == 0:
r_buf = allocate(MAX_MSG_SIZE*numprocs)
else:
s_buf = allocate(MAX_MSG_SIZE)
if myid == 0:
print (f'# {BENCHMARH}')
if myid == 0:
print ('# %-8s%20s' % ("Size [B]", "Latency [us]"))
for size in message_sizes(MAX_MSG_SIZE):
if size > large_message_size:
skip = skip_large
loop = loop_large
iterations = list(range(loop+skip))
if myid == 0:
s_msg = MPI.IN_PLACE
r_msg = [r_buf, size, MPI.BYTE]
else:
s_msg = [s_buf, size, MPI.BYTE]
r_msg = None
#
comm.Barrier()
for i in iterations:
if i == skip:
t_start = MPI.Wtime()
comm.Gather(s_msg, r_msg, 0)
t_end = MPI.Wtime()
comm.Barrier()
#
if myid == 0:
latency = (t_end - t_start) * 1e6 / loop
print ('%-10d%20.2f' % (size, latency))
def message_sizes(max_size):
return [0] + [(1<<i) for i in range(30)
if (1<<i) <= max_size]
def allocate(n):
try:
import mmap
return mmap.mmap(-1, n)
except (ImportError, OSError):
try:
from numpy import zeros
return zeros(n, 'B')
except ImportError:
from array import array
return array('B', [0]) * n
if __name__ == '__main__':
osu_gather()
|