File: interengine.py

package info (click to toggle)
ipyparallel 8.8.0-6
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,412 kB
  • sloc: python: 21,991; javascript: 267; makefile: 29; sh: 28
file content (39 lines) | stat: -rw-r--r-- 1,382 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import ipyparallel as ipp

rc = ipp.Client()
rc.block = True
view = rc[:]
view.run('communicator.py')
view.execute('com = EngineCommunicator()')

# gather the connection information into a dict
ar = view.apply_async(lambda: com.info)  # noqa: F821
peers = ar.get_dict()
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators

# connect the engines to each other:
view.apply_sync(lambda pdict: com.connect(pdict), peers)  # noqa: F821

# now all the engines are connected, and we can communicate between them:


def broadcast(client, sender, msg_name, dest_name=None, block=None):
    """broadcast a message from one engine to all others."""
    dest_name = msg_name if dest_name is None else dest_name
    client[sender].execute('com.publish(%s)' % msg_name, block=None)
    targets = client.ids
    targets.remove(sender)
    return client[targets].execute('%s=com.consume()' % dest_name, block=None)


def send(client, sender, targets, msg_name, dest_name=None, block=None):
    """send a message from one to one-or-more engines."""
    dest_name = msg_name if dest_name is None else dest_name

    def _send(targets, m_name):
        msg = globals()[m_name]
        return com.send(targets, msg)  # noqa: F821

    client[sender].apply_async(_send, targets, msg_name)

    return client[targets].execute('%s=com.recv()' % dest_name, block=None)