File: test_multi.rst

package info (click to toggle)
execnet 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: python: 5,244; makefile: 78; sh: 2
file content (90 lines) | stat: -rw-r--r-- 2,990 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
Advanced (multi) channel communication
=====================================================

MultiChannel: container for multiple channels
------------------------------------------------------

Use ``execnet.MultiChannel`` to work with multiple channels::

    >>> import execnet
    >>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
    >>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
    >>> mch = execnet.MultiChannel([ch1, ch2])
    >>> len(mch)
    2
    >>> mch[0] is ch1 and mch[1] is ch2
    True
    >>> ch1 in mch and ch2 in mch
    True
    >>> sum(mch.receive_each())
    3

Receive results from sub processes with a Queue
-----------------------------------------------------

Use ``MultiChannel.make_receive_queue()`` to get a queue
from which to obtain results::

    >>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
    >>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
    >>> mch = execnet.MultiChannel([ch1, ch2])
    >>> queue = mch.make_receive_queue()
    >>> chan1, res1 = queue.get()
    >>> chan2, res2 = queue.get(timeout=3)
    >>> res1 + res2
    3

Working asynchronously/event-based with channels
---------------------------------------------------

Use channel callbacks if you want to process incoming
data immediately and without blocking execution::

    >>> import execnet
    >>> gw = execnet.makegateway()
    >>> ch = gw.remote_exec("channel.receive() ; channel.send(42)")
    >>> l = []
    >>> ch.setcallback(l.append)
    >>> ch.send(1)
    >>> ch.waitclose()
    >>> assert l == [42]

Note that the callback function will be executed in the
receiver thread and should not block or run for too long.

Robustly receive results and termination notification
-----------------------------------------------------

Use ``MultiChannel.make_receive_queue(endmarker)`` to specify
an object to be put to the queue when the remote side of a channel
is closed.  The endmarker will also be put to the Queue if the gateway
is blocked in execution and is terminated/killed::

    >>> group = execnet.Group(['popen'] * 3) # create three gateways
    >>> mch = group.remote_exec("channel.send(channel.receive()+1)")
    >>> queue = mch.make_receive_queue(endmarker=42)
    >>> mch[0].send(1)
    >>> chan1, res1 = queue.get()
    >>> res1
    2
    >>> group.terminate(timeout=1) # kill processes waiting on receive
    >>> for i in range(3):
    ...    chan1, res1 = queue.get()
    ...    assert res1 == 42
    >>> group
    <Group []>



Saturate multiple Hosts and CPUs with tasks to process
--------------------------------------------------------

If you have multiple CPUs or hosts you can create as many
gateways and then have a process sit on each CPU and wait
for a task to proceed.  One complication is that we
want to ensure clean termination of all processes
and loose no result.  Here is an example that just uses
local subprocesses and does the task:

.. include:: taskserver.py
    :literal: