File: vine_python_future_module.py

package info (click to toggle)
cctools 1%3A7.14.5-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 36,956 kB
  • sloc: ansic: 114,614; python: 29,532; cpp: 20,313; sh: 13,675; perl: 4,056; xml: 3,688; makefile: 1,436
file content (158 lines) | stat: -rw-r--r-- 4,051 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#! /usr/bin/env python

import sys
import ndcctools.taskvine as vine
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures import TimeoutError

port_file = None
try:
    port_file = sys.argv[1]
except IndexError:
    sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
    raise


# Define a function to invoke remotely
def my_sum(x, y, negate=False):
    from operator import add, mul

    f = 1
    if negate:
        f = -1
    s = mul(f, add(x, y))
    return s


def my_exception():
    raise Exception("Expected failure.")


def my_timeout():
    import time
    time.sleep(60)


def main():
    executor = vine.FuturesExecutor(
        port=[9123, 9129], manager_name="vine_matrtix_build_test", factory=False
    )
    print("listening on port {}".format(executor.manager.port))
    with open(port_file, "w") as f:
        f.write(str(executor.manager.port))

    # Submit several tasks for wait function:
    print("submitting tasks...")

    t1 = executor.future_task(my_sum, 7, 4)
    t1.set_cores(1)
    a = executor.submit(t1)

    t2 = executor.future_task(my_sum, a, a)
    t2.set_cores(1)
    b = executor.submit(t2)

    t3 = executor.future_task(my_sum, a, b)
    t3.set_cores(1)
    c = executor.submit(t3)
    
    print("waiting for result...")
    results = vine.futures.wait([a, b, c])
    done = results.done
    not_done = results.not_done
    print(f"results = DONE: {done}\n NOT DONE: not_done")

    # Submit several tasks for as_completed function:
    print("submitting tasks...")

    t1 = executor.future_task(my_sum, 7, 4)
    t1.set_cores(1)
    a = executor.submit(t1)

    t2 = executor.future_task(my_sum, a, a)
    t2.set_cores(1)
    b = executor.submit(t2)

    t3 = executor.future_task(my_sum, a, b)
    t3.set_cores(1)
    c = executor.submit(t3)
    
    print("waiting for result...")
    results = list(vine.futures.as_completed([a, b, c]))
    print(f"results = {results}")

    # Test timeouts
    t1 = executor.future_task(my_timeout)
    t1.set_cores(1)
    future = executor.submit(t1)

    try:
        future.result(timeout=1)
    except TimeoutError:
        future.cancel()
        print("timeout raised correctly")
    else:
        raise RuntimeError("TimeoutError was not raised correctly.")

    # # Test error handling with wait
    t1 = executor.future_task(my_sum, 7, 4)
    t1.set_cores(1)
    a = executor.submit(t1)

    t2 = executor.future_task(my_timeout)
    t2.set_cores(1)
    b = executor.submit(t2)

    results = vine.futures.wait([a, b], return_when=FIRST_COMPLETED)
    assert len(results.done) == 1
    assert len(results.not_done) == 1
    assert results.done.pop().result() == 11

    results = vine.futures.wait([a, b], timeout=2, return_when=ALL_COMPLETED)
    assert len(results.done) == 1
    assert len(results.not_done) == 1
    assert results.done.pop().result() == 11

    t3 = executor.future_task(my_exception)
    t3.set_cores(1)
    c = executor.submit(t3)

    results = vine.futures.wait([b, c], return_when=FIRST_EXCEPTION)
    assert len(results.done) == 1
    assert len(results.not_done) == 1
    assert results.done.pop().exception() is not None

    # Cancel the task that is still sleeping
    b.cancel()

    # Test timeouts with as_completed
    t1 = executor.future_task(my_sum, 7, 4)
    t1.set_cores(1)
    a = executor.submit(t1)

    t2 = executor.future_task(my_timeout)
    t2.set_cores(1)
    b = executor.submit(t2)

    iterator = vine.futures.as_completed([a, b], timeout=5)

    # task 1 should complete correctly within the timeout and be yielded first
    a_future = next(iterator)
    assert a_future.result() == 11

    try:
        next(iterator)
    except TimeoutError:
        b.cancel()
        print("as_completed raised timeout correctly")
    else:
        raise RuntimeError("TimeoutError was not raised correctly.")


if __name__ == "__main__":
    main()


# vim: set sts=4 sw=4 ts=4 expandtab ft=python: