File: vine_python_serverless.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (187 lines) | stat: -rwxr-xr-x 7,818 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python3

# This example shows how to install a library of functions once
# as a LibraryTask, and then invoke that library remotely by
# using FunctionCall tasks.

import ndcctools.taskvine as vine
import argparse
import math
import json
from ndcctools.taskvine.utils import load_variable_from_library
import os

def exp(x, y=3):
    return {'base_val': x**y}

def cube(x, with_library=False):
    # whenever using FromImport statments, put them inside of functions
    from random import uniform
    from time import sleep as time_sleep

    random_delay = uniform(0.00001, 0.0001)
    time_sleep(random_delay)

    if with_library:
        base_val = load_variable_from_library('base_val')
        return base_val + math.pow(x, 3)
    return math.pow(x, 3)

def divide(dividend, divisor, with_library=False):
    # straightfoward usage of preamble import statements
    if with_library:
        base_val = load_variable_from_library('base_val')
        return base_val + dividend / math.sqrt(divisor)
    return dividend / math.sqrt(divisor)

def double(x, with_library=False):
    import math as m
    # use alias inside of functions
    if with_library:
        base_val = load_variable_from_library('base_val')
        return base_val + m.prod([x,2])
    return m.prod([x, 2])

def func_copy_input_to_output(input_filename, output_filename):
    with open(input_filename, 'r') as f:
        contents = f.read()
    with open(output_filename, 'w') as f:
        f.write(contents)
    return 0

def main():
    parser = argparse.ArgumentParser("Test for taskvine python bindings.")
    parser.add_argument("port_file", help="File to write the port the queue is using.")

    args = parser.parse_args()

    q = vine.Manager(port=0)
    q.tune("watch-library-logfiles", 1)

    print(f"TaskVine manager listening on port {q.port}")

    with open(args.port_file, "w") as f:
        print("Writing port {port} to file {file}".format(port=q.port, file=args.port_file))
        f.write(str(q.port))

    print("Creating library from packages and functions...")

    # This format shows how to create package import statements for the library
    hoisting_modules = [math]

    
    libtask_no_context_direct = q.create_library_from_functions('test-library-no-context-direct', divide, double, cube, hoisting_modules=hoisting_modules, add_env=False, exec_mode='direct')
    
    libtask_no_context_fork = q.create_library_from_functions('test-library-no-context-fork', divide, double, cube, hoisting_modules=hoisting_modules, add_env=False, exec_mode='fork')
    
    libtask_with_context_direct = q.create_library_from_functions('test-library-with-context-direct', divide, double, cube, hoisting_modules=hoisting_modules, add_env=False, exec_mode='direct', library_context_info=[exp, [2], {'y': 3}])
    
    libtask_with_context_fork = q.create_library_from_functions('test-library-with-context-fork', divide, double, cube, hoisting_modules=hoisting_modules, add_env=False, exec_mode='fork', library_context_info=[exp, [2], {'y': 3}])

    # define special functions (1 lambda function and 1 dynamically executed function
    # lambda functions can be specified with a custom name, otherwise it will be assigned a default name by Python as "<lambda>".
    lambda_fn = lambda x : x + 1
    exec("def dyn_fn(x):\n    return x + 2", globals(), globals())
    libtask_with_special_fns = q.create_library_from_functions('test-library-with-special-fns', lambda_fn, dyn_fn, add_env=False, exec_mode='fork')

    # define a function that copies an input file to an output file.
    # this is to test the input/output staging of function calls
    libtask_with_io_fn = q.create_library_from_functions('test-library-with-io-fn', func_copy_input_to_output, add_env=False, exec_mode='fork')

    # Just take default resources for the library, this will cause it to fill the whole worker. 
    # And the number of functions slots will match the number of cores available.

    q.install_library(libtask_no_context_direct)
    q.install_library(libtask_no_context_fork)
    q.install_library(libtask_with_context_direct)
    q.install_library(libtask_with_context_fork)
    q.install_library(libtask_with_special_fns)
    q.install_library(libtask_with_io_fn)
    lib_task_names = ['test-library-no-context-direct',
                      'test-library-no-context-fork',
                      'test-library-with-context-direct',
                      'test-library-with-context-fork',
                      'test-library-with-special-fns',
                      'test-library-with-io-fn']
    print("Submitting function call tasks...")
    
    tasks = 100
    total_sum = 0    
    for lib_name in lib_task_names:
        with_library = False
        if lib_name.find('with-context') != -1:
            with_library=True
        for _ in range(0, tasks):
            if lib_name == 'test-library-with-special-fns':
                s_task = vine.FunctionCall(lib_name, '<lambda>', 1)
                q.submit(s_task)

                s_task = vine.FunctionCall(lib_name, 'dyn_fn', 1)
                q.submit(s_task)
            elif lib_name == 'test-library-with-io-fn':
                input_filename = os.path.basename(__file__) + '.input'
                output_filename = os.path.basename(__file__) + '.output'
                with open(input_filename, 'w') as f:
                    print('Test IO with function calls', file=f)
                s_task = vine.FunctionCall(lib_name, 'func_copy_input_to_output', input_filename, output_filename)
                input_file = q.declare_file(input_filename)
                output_file = q.declare_file(output_filename)
                s_task.add_input(input_file, input_filename)
                s_task.add_output(output_file, output_filename)
                q.submit(s_task)

                # do this test once only
                break
            else:
                s_task = vine.FunctionCall(lib_name, 'divide', 2, 2**2, with_library=with_library)
                q.submit(s_task)
            
                s_task = vine.FunctionCall(lib_name, 'double', 3, with_library=with_library)
                q.submit(s_task)

                s_task = vine.FunctionCall(lib_name, 'cube', 4, with_library=with_library)
                q.submit(s_task)

        while not q.empty():
            t = q.wait(5)
            if t:
                x = t.output
                try:
                    total_sum += x
                except:
                    print(x)
                    raise
                print(f"task {t.id} completed with result {x}")
        print('Done', lib_name)

    # Check that we got the right result.
    no_context_direct_expected = (tasks * (divide(2, 2**2) + double(3) + cube(4)))
    no_context_fork_expected = (tasks * (divide(2, 2**2) + double(3) + cube(4)))
    base_val = exp(2, 3)['base_val']
    with_context_direct_expected = (tasks * (divide(2, 2**2) + double(3) + cube(4) + base_val * 3))
    with_context_fork_expected = (tasks * (divide(2, 2**2) + double(3) + cube(4) + base_val * 3))
    special_fns_expected = (tasks * (lambda_fn(1) + dyn_fn(1)))
    expected = no_context_direct_expected + no_context_fork_expected + with_context_direct_expected + with_context_fork_expected + special_fns_expected

    print(f"Total:    {total_sum}")
    print(f"Expected: {expected}")

    # Check that IO test passed
    with open(input_filename, 'r') as f:
        content_input = f.read()

    with open(output_filename, 'r') as f:
        content_output = f.read()
    
    print(f"Input: {content_input}", end='')
    print(f"Output: {content_output}", end='')

    assert content_input == content_output

    assert total_sum == expected

if __name__ == '__main__':
    main()


# vim: set sts=4 sw=4 ts=4 expandtab ft=python: