1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
#! /usr/bin/env python3
import sys, json
import ndcctools.work_queue as wq
port_file = None
try:
port_file = sys.argv[1]
except IndexError:
sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
raise
# Define functions to invoke remotely
def add(x, y):
return x + y
def multiply(x, y):
return x * y
def kwargs_test(x=5, y=6, z=7):
return x + y * z
def no_arguments_test(a, b, c):
return a + b + c
def exception_test():
raise Exception("I am a bad function")
# Create a new queue
queue = wq.WorkQueue(port=0, debug_log="manager.log")
print("listening on port {}".format(queue.port))
with open(port_file, "w") as f:
f.write(str(queue.port))
# Submit several tasks for execution:
print("submitting tasks...")
for value in range(1,10):
# simple addition of two arguments using thread execution and passing all arguments positionally
# should return value + value
task = wq.RemoteTask("add", "my_coprocess")
task.specify_cores(1)
task.specify_disk(10)
task.specify_memory(10)
task.specify_gpus(0)
task.specify_fn_args([value, value])
task.specify_exec_method("thread")
queue.submit(task)
# multiplication of two arguments using fork as the execution method and passing arguments as a mix of positional arguments and dictionary arguments
# should return value * value
task = wq.RemoteTask("multiply", "my_coprocess")
task.specify_cores(1)
task.specify_disk(10)
task.specify_memory(10)
task.specify_gpus(0)
task.specify_fn_args([value], {"y":value})
task.specify_exec_method("fork")
queue.submit(task)
# testing of passing entirely keyword arguments using the direct execution method
# should return 7 for every iteration (1 + 2 * 3)
task = wq.RemoteTask("kwargs_test", "my_coprocess", x=1, y=2, z=3)
task.specify_cores(1)
task.specify_disk(10)
task.specify_memory(10)
task.specify_gpus(0)
task.specify_exec_method("direct")
queue.submit(task)
# testing whether functions that do not recieve enough arguments properly create errors
# should return with status code 500, saying that positional arguments are missing
task = wq.RemoteTask("no_arguments_test", "my_coprocess")
task.specify_cores(1)
task.specify_disk(10)
task.specify_memory(10)
task.specify_gpus(0)
task.specify_exec_method("thread")
queue.submit(task)
# testing whether functions that raise exceptions properly have their exceptions captured and returned in the result
# should return with status code 500 and the result should be the exception thrown in the function
task = wq.RemoteTask("exception_test", "my_coprocess")
task.specify_cores(1)
task.specify_disk(10)
task.specify_memory(10)
task.specify_gpus(0)
task.specify_exec_method("thread")
queue.submit(task)
# keep track of task outputs
add_sum = 0
multiply_sum = 0
kwargs_sum = 0
no_arguments_errors = 0
num_exceptions = 0
while not queue.empty():
task = queue.wait(5)
if task:
print("task {} completed with result {}".format(task.id,task.output))
# update variable depending on task command
if task.command == "add":
add_sum += int(json.loads(task.output)["Result"])
elif task.command == "multiply":
multiply_sum += int(json.loads(task.output)["Result"])
elif task.command == "kwargs_test":
kwargs_sum += int(json.loads(task.output)["Result"])
elif task.command == "no_arguments_test":
no_arguments_errors += 1
elif task.command == "exception_test":
num_exceptions += 1
assert(add_sum == 90)
assert(multiply_sum == 285)
assert(kwargs_sum == 63)
assert(no_arguments_errors == 9)
assert(num_exceptions == 9)
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
|