File: vine_python_temp_files.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (58 lines) | stat: -rwxr-xr-x 1,368 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#! /usr/bin/env python

import sys
import ndcctools.taskvine as vine

port_file = None
try:
    port_file = sys.argv[1]
except IndexError:
    sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
    raise


# Define functions to invoke remotely
def fun_a(arg):
    return f"{arg}"


def fun_b(remote_data):
    import cloudpickle
    with open(remote_data, "rb") as f:
        r = cloudpickle.load(f)
    return f"{r} world!"


# Create a new m
m = vine.Manager(port=[9123, 9130])
print("listening on port {}".format(m.port))
with open(port_file, "w") as f:
    f.write(str(m.port))

t_a = vine.PythonTask(fun_a, "hello")
t_a.enable_temp_output()

m.submit(t_a)
while not m.empty():
    t = m.wait(5)
    if t and not t.successful():
        print(f"Something went wrong with task a: {t.result}")
        sys.exit(1)

# use the output of t_a, which t_b knows is encoded as a remote temp file
t_b = vine.PythonTask(fun_b, "remote_data")
t_b.add_input(t_a.output_file, "remote_data")
m.submit(t_b)
while not m.empty():
    t = m.wait(5)
    if t and not t.successful():
        print(f"Something went wrong with task b: {t.result}")
        sys.exit(1)

# we now can remove the temp file from the worker
m.undeclare_file(t_a.output_file)

print(f"final output: {t_b.output}")

assert t_b.output == "hello world!"
# vim: set sts=4 sw=4 ts=4 expandtab ft=python: