# More efficient data movement with MPI

Just like [we did](memmap%20Broadcast.ipynb) manually with memmap,
you can move data more efficiently with MPI by sending it to just one engine,
and using MPI to broadcast it to the rest of the engines.


In [1]:
import socket
import os, sys, re

import numpy as np

import ipyparallel as ipp

For this demo, I will connect to a cluster with engines started with MPI.

One way to do so would be:

    ipcluster start -n 64 --engines=MPI --profile mpi
    
In this directory is a docker-compose file to simulate multiple engine sets launched with MPI.

I ran this example with a cluster on a 64-core remote VM,
so communication between the client and controller is over the public internet,
while communication between the controller and engines is local.

In [3]:
rc = ipp.Client(profile="mpi")
rc.wait_for_engines(64)
eall = rc.broadcast_view(coalescing=True)
root = rc[0]

In [5]:
len(rc)

64

In [6]:
root['a'] = 5

In [7]:
%px from mpi4py.MPI import COMM_WORLD as MPI

We cn get a mapping of IPP rank to MPI rank, in case they mismatch.

In recent-enough IPython Parallel,
they usually don't because IPython engines request their MPI rank as their engine id.

In [9]:
mpi_ranks = eall.apply_async(lambda : MPI.Get_rank()).get_dict()
root_rank = root.apply_sync(lambda : MPI.Get_rank())
mpi_ranks

{0: 0,
 1: 1,
 2: 2,
 3: 3,
 4: 4,
 5: 5,
 6: 6,
 7: 7,
 8: 8,
 9: 9,
 10: 10,
 11: 11,
 12: 12,
 13: 13,
 14: 14,
 15: 15,
 16: 16,
 17: 17,
 18: 18,
 19: 19,
 20: 20,
 21: 21,
 22: 22,
 23: 23,
 24: 24,
 25: 25,
 26: 26,
 27: 27,
 28: 28,
 29: 29,
 30: 30,
 31: 31,
 32: 32,
 33: 33,
 34: 34,
 35: 35,
 36: 36,
 37: 37,
 38: 38,
 39: 39,
 40: 40,
 41: 41,
 42: 42,
 43: 43,
 44: 44,
 45: 45,
 46: 46,
 47: 47,
 48: 48,
 49: 49,
 50: 50,
 51: 51,
 52: 52,
 53: 53,
 54: 54,
 55: 55,
 56: 56,
 57: 57,
 58: 58,
 59: 59,
 60: 60,
 61: 61,
 62: 62,
 63: 63}

In [18]:
sz = 512
data = np.random.random((sz, sz))
megabytes = data.nbytes // (1024 * 1024)
megabytes

32

In [19]:
%%time 
ar = eall.push({'data': data}, block=False)
ar.wait_interactive()


_push:   0%|          | 0/64 [00:00<?, ?tasks/s]

CPU times: user 285 ms, sys: 94.4 ms, total: 379 ms
Wall time: 4.49 s


In [20]:
@ipp.interactive
def _bcast(key, root_rank):
    """function to run on engines as part of broadcast"""
    g = globals()
    obj = g.get(key, None)
    obj = MPI.bcast(obj, root_rank)
    g[key] = obj

def broadcast(key, obj, dv, root, root_rank):
    """More efficient broadcast by doing push to root,
    and MPI broadcast to other engines.
    
    Still O(N) messages, but all but one message is always small.
    """
    root.push({key : obj}, block=False)
    return dv.apply_async(_bcast, key, root_rank)

In [21]:
%%time
ar = broadcast('data', data, eall, root, root_rank)
ar.wait_interactive()

_bcast:   0%|          | 0/64 [00:00<?, ?tasks/s]

CPU times: user 252 ms, sys: 58.3 ms, total: 310 ms
Wall time: 939 ms


And we can quickly check that everyone got the same data by computing its norm

In [15]:
%%px
import numpy as np
np.linalg.norm(data, 2)


[0;31mOut[0:2]: [0m255.8632587551305

[0;31mOut[1:2]: [0m255.8632587551305

[0;31mOut[2:2]: [0m255.8632587551305

[0;31mOut[3:2]: [0m255.8632587551305

[0;31mOut[4:2]: [0m255.8632587551305

[0;31mOut[5:2]: [0m255.8632587551305

[0;31mOut[6:2]: [0m255.8632587551305

[0;31mOut[7:2]: [0m255.8632587551305

[0;31mOut[8:2]: [0m255.8632587551305

[0;31mOut[9:2]: [0m255.8632587551305

[0;31mOut[10:2]: [0m255.8632587551305

[0;31mOut[11:2]: [0m255.8632587551305

[0;31mOut[12:2]: [0m255.8632587551305

[0;31mOut[13:2]: [0m255.8632587551305

[0;31mOut[14:2]: [0m255.8632587551305

[0;31mOut[15:2]: [0m255.8632587551305

[0;31mOut[16:2]: [0m255.8632587551305

[0;31mOut[17:2]: [0m255.8632587551305

[0;31mOut[18:2]: [0m255.8632587551305

[0;31mOut[19:2]: [0m255.8632587551305

[0;31mOut[20:2]: [0m255.8632587551305

[0;31mOut[21:2]: [0m255.8632587551305

[0;31mOut[22:2]: [0m255.8632587551305

[0;31mOut[23:2]: [0m255.8632587551305

[0;31mOut[24:2]: [0m255.8632587551305

[0;31mOut[25:2]: [0m255.8632587551305

[0;31mOut[26:2]: [0m255.8632587551305

[0;31mOut[27:2]: [0m255.8632587551305

[0;31mOut[28:2]: [0m255.8632587551305

[0;31mOut[29:2]: [0m255.8632587551305

[0;31mOut[30:2]: [0m255.8632587551305

[0;31mOut[31:2]: [0m255.8632587551305

[0;31mOut[32:2]: [0m255.8632587551305

[0;31mOut[33:2]: [0m255.8632587551305

[0;31mOut[34:2]: [0m255.8632587551305

[0;31mOut[35:2]: [0m255.8632587551305

[0;31mOut[36:2]: [0m255.8632587551305

[0;31mOut[37:2]: [0m255.8632587551305

[0;31mOut[38:2]: [0m255.8632587551305

[0;31mOut[39:2]: [0m255.8632587551305

[0;31mOut[40:2]: [0m255.8632587551305

[0;31mOut[41:2]: [0m255.8632587551305

[0;31mOut[42:2]: [0m255.8632587551305

[0;31mOut[43:2]: [0m255.8632587551305

[0;31mOut[44:2]: [0m255.8632587551305

[0;31mOut[45:2]: [0m255.8632587551305

[0;31mOut[46:2]: [0m255.8632587551305

[0;31mOut[47:2]: [0m255.8632587551305

[0;31mOut[48:2]: [0m255.8632587551305

[0;31mOut[49:2]: [0m255.8632587551305

[0;31mOut[50:2]: [0m255.8632587551305

[0;31mOut[51:2]: [0m255.8632587551305

[0;31mOut[52:2]: [0m255.8632587551305

[0;31mOut[53:2]: [0m255.8632587551305

[0;31mOut[54:2]: [0m255.8632587551305

[0;31mOut[55:2]: [0m255.8632587551305

[0;31mOut[56:2]: [0m255.8632587551305

[0;31mOut[57:2]: [0m255.8632587551305

[0;31mOut[58:2]: [0m255.8632587551305

[0;31mOut[59:2]: [0m255.8632587551305

[0;31mOut[60:2]: [0m255.8632587551305

[0;31mOut[61:2]: [0m255.8632587551305

[0;31mOut[62:2]: [0m255.8632587551305

[0;31mOut[63:2]: [0m255.8632587551305