1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
import sys
import threading
from mpi4py import MPI
if MPI.Query_thread() < MPI.THREAD_MULTIPLE:
sys.stderr.write("MPI does not provide enough thread support\n")
sys.exit(0)
try:
import numpy
except ImportError:
sys.stderr.write("NumPy package not available\n")
sys.exit(0)
send_msg = numpy.arange(1000000, dtype="i")
recv_msg = numpy.zeros_like(send_msg)
start_event = threading.Event()
def self_send():
start_event.wait()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
comm.Send([send_msg, MPI.INT], dest=rank, tag=0)
def self_recv():
start_event.wait()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
comm.Recv([recv_msg, MPI.INT], source=rank, tag=0)
send_thread = threading.Thread(target=self_send)
recv_thread = threading.Thread(target=self_recv)
for t in (recv_thread, send_thread):
t.start()
assert not numpy.allclose(send_msg, recv_msg)
start_event.set()
for t in (recv_thread, send_thread):
t.join()
assert numpy.allclose(send_msg, recv_msg)
|