File: wq_python_task.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (62 lines) | stat: -rw-r--r-- 1,425 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#! /usr/bin/env python

import sys
import ndcctools.work_queue as wq

port_file = None
try:
    port_file = sys.argv[1]
except IndexError:
    sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
    raise

# Define a function to invoke remotely
def my_sum(x, y, negate=False):
    import math
    f = 1
    if negate:
        f = -1
    s = f * (x + y)
    return s

# Create a new queue
queue = wq.WorkQueue(port=0)
print("listening on port {}".format(queue.port))
with open(port_file, "w") as f:
    f.write(str(queue.port))


# Submit several tasks for execution:
print("submitting tasks...")
for value in range(1,10):
    task = wq.PythonTask(my_sum, value, value)
    task.specify_cores(1)
    queue.submit(task)

# add task outputs
positive_sum = 0
while not queue.empty():
    task = queue.wait(5)
    if task:
        print("task {} completed with result {}".format(task.id,task.output))
        positive_sum += task.output


# Submit several tasks for execution:
for value in range(1,10):
    task = wq.PythonTask(my_sum, value, value, negate=True)
    task.specify_cores(1)
    queue.submit(task)

# add task outputs
negative_sum = 0
while not queue.empty():
    task = queue.wait(5)
    if task:
        print("task {} completed with result {}".format(task.id,task.output))
        negative_sum += task.output


assert(positive_sum == (-1 * negative_sum))

# vim: set sts=4 sw=4 ts=4 expandtab ft=python: