1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
# http://mvapich.cse.ohio-state.edu/benchmarks/
from mpi4py import MPI
def osu_scatter(
BENCHMARH="MPI Scatter Latency Test",
skip=1000,
loop=10000,
skip_large=10,
loop_large=100,
large_message_size=8192,
MAX_MSG_SIZE=1 << 20,
):
comm = MPI.COMM_WORLD
myid = comm.Get_rank()
numprocs = comm.Get_size()
if numprocs < 2:
if myid == 0:
errmsg = "This test requires at least two processes"
else:
errmsg = None
raise SystemExit(errmsg)
if myid == 0:
s_buf = allocate(MAX_MSG_SIZE * numprocs)
else:
r_buf = allocate(MAX_MSG_SIZE)
if myid == 0:
print(f"# {BENCHMARH}")
if myid == 0:
print(f"# {'Size [B]':<8s}{'Latency [us]':>20s}")
for size in message_sizes(MAX_MSG_SIZE):
if size > large_message_size:
skip = skip_large
loop = loop_large
iterations = list(range(loop + skip))
if myid == 0:
s_msg = [s_buf, size, MPI.BYTE]
r_msg = MPI.IN_PLACE
else:
s_msg = None
r_msg = [r_buf, size, MPI.BYTE]
#
comm.Barrier()
for i in iterations:
if i == skip:
t_start = MPI.Wtime()
comm.Scatter(s_msg, r_msg, 0)
t_end = MPI.Wtime()
comm.Barrier()
#
if myid == 0:
latency = (t_end - t_start) * 1e6 / loop
print(f"{size:-10d}{latency:20.2f}")
def message_sizes(max_size):
return [0] + [(1 << i) for i in range(30) if (1 << i) <= max_size]
def allocate(n):
try:
import mmap
return mmap.mmap(-1, n)
except (ImportError, OSError):
try:
from numpy import zeros
return zeros(n, "B")
except ImportError:
from array import array
return array("B", [0]) * n
if __name__ == "__main__":
osu_scatter()
|