File: taskserver.py

package info (click to toggle)
execnet 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: python: 5,244; makefile: 78; sh: 2
file content (51 lines) | stat: -rw-r--r-- 1,348 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from __future__ import annotations

import execnet

group = execnet.Group()
for i in range(4):  # 4 CPUs
    group.makegateway()


def process_item(channel):
    # task processor, sits on each CPU
    import time
    import random

    channel.send("ready")
    for x in channel:
        if x is None:  # we can shutdown
            break
        # sleep random time, send result
        time.sleep(random.randrange(3))
        channel.send(x * 10)


# execute taskprocessor everywhere
mch = group.remote_exec(process_item)

# get a queue that gives us results
q = mch.make_receive_queue(endmarker=-1)
tasks: list[int] | None = list(range(10))  # a list of tasks, here just integers
terminated = 0
while 1:
    channel, item = q.get()
    if item == -1:
        terminated += 1
        print("terminated %s" % channel.gateway.id)
        if terminated == len(mch):
            print("got all results, terminating")
            break
        continue
    if item != "ready":
        print(f"other side {channel.gateway.id} returned {item!r}")
    if not tasks and tasks is not None:
        print("no tasks remain, sending termination request to all")
        mch.send_each(None)
        tasks = None
    if tasks:
        task = tasks.pop()
        channel.send(task)
        print(f"sent task {task!r} to {channel.gateway.id}")

group.terminate()