File: wq_remote_task.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (112 lines) | stat: -rw-r--r-- 3,837 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#! /usr/bin/env python3

import sys, json
import ndcctools.work_queue as wq
port_file = None
try:
    port_file = sys.argv[1]
except IndexError:
    sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
    raise

# Define functions to invoke remotely
def add(x, y):
    return x + y
def multiply(x, y):
    return x * y
def kwargs_test(x=5, y=6, z=7):
    return x + y * z
def no_arguments_test(a, b, c):
    return a + b + c
def exception_test():
    raise Exception("I am a bad function")

# Create a new queue
queue = wq.WorkQueue(port=0, debug_log="manager.log")
print("listening on port {}".format(queue.port))
with open(port_file, "w") as f:
    f.write(str(queue.port))

# Submit several tasks for execution:
print("submitting tasks...")
for value in range(1,10):
    # simple addition of two arguments using thread execution and passing all arguments positionally 
    # should return value + value
    task = wq.RemoteTask("add", "my_coprocess")
    task.specify_cores(1)
    task.specify_disk(10)
    task.specify_memory(10)
    task.specify_gpus(0)
    task.specify_fn_args([value, value])
    task.specify_exec_method("thread")
    queue.submit(task)

    # multiplication of two arguments using fork as the execution method and passing arguments as a mix of positional arguments and dictionary arguments
    # should return value * value
    task = wq.RemoteTask("multiply", "my_coprocess")
    task.specify_cores(1)
    task.specify_disk(10)
    task.specify_memory(10)
    task.specify_gpus(0)
    task.specify_fn_args([value], {"y":value})
    task.specify_exec_method("fork")
    queue.submit(task)

    # testing of passing entirely keyword arguments using the direct execution method
    # should return 7 for every iteration (1 + 2 * 3)
    task = wq.RemoteTask("kwargs_test", "my_coprocess", x=1, y=2, z=3)
    task.specify_cores(1)
    task.specify_disk(10)
    task.specify_memory(10)
    task.specify_gpus(0)
    task.specify_exec_method("direct")
    queue.submit(task)

    # testing whether functions that do not recieve enough arguments properly create errors
    # should return with status code 500, saying that positional arguments are missing
    task = wq.RemoteTask("no_arguments_test", "my_coprocess")
    task.specify_cores(1)
    task.specify_disk(10)
    task.specify_memory(10)
    task.specify_gpus(0)
    task.specify_exec_method("thread")
    queue.submit(task)

    # testing whether functions that raise exceptions properly have their exceptions captured and returned in the result
    # should return with status code 500 and the result should be the exception thrown in the function
    task = wq.RemoteTask("exception_test", "my_coprocess")
    task.specify_cores(1)
    task.specify_disk(10)
    task.specify_memory(10)
    task.specify_gpus(0)
    task.specify_exec_method("thread")
    queue.submit(task)

# keep track of task outputs
add_sum = 0
multiply_sum = 0
kwargs_sum = 0
no_arguments_errors = 0
num_exceptions = 0
while not queue.empty():
    task = queue.wait(5)
    if task:
        print("task {} completed with result {}".format(task.id,task.output))
        # update variable depending on task command
        if task.command == "add":
            add_sum += int(json.loads(task.output)["Result"])
        elif task.command == "multiply":
            multiply_sum += int(json.loads(task.output)["Result"])
        elif task.command == "kwargs_test":
            kwargs_sum += int(json.loads(task.output)["Result"])
        elif task.command == "no_arguments_test":
            no_arguments_errors += 1
        elif task.command == "exception_test":
            num_exceptions += 1

assert(add_sum == 90)
assert(multiply_sum == 285)
assert(kwargs_sum == 63)
assert(no_arguments_errors == 9)
assert(num_exceptions == 9)
# vim: set sts=4 sw=4 ts=4 expandtab ft=python: