1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
import pytest
from parsl import join_app, python_app
from parsl.dataflow.errors import JoinError
RESULT_CONSTANT = 3
@python_app
def inner_app():
return RESULT_CONSTANT
@join_app
def outer_app():
fut = inner_app()
return fut
@python_app
def add_one(n):
return n + 1
@python_app
def combine(*args):
"""Wait for an arbitrary list of futures and return them as a list"""
return list(args)
@join_app
def outer_make_a_dag_combine(n):
return combine(*(inner_app() for _ in range(n)))
@join_app
def outer_make_a_dag_multi(n):
return [inner_app() for _ in range(n)]
def test_result_flow():
f = outer_app()
assert f.result() == RESULT_CONSTANT
@join_app
def join_wrong_type_app():
return 3
def test_wrong_type():
f = join_wrong_type_app()
with pytest.raises(TypeError):
f.result()
def test_dependency_on_joined():
g = add_one(outer_app())
assert g.result() == RESULT_CONSTANT + 1
def test_combine():
f = outer_make_a_dag_combine(inner_app())
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT
def test_multiple_return():
f = outer_make_a_dag_multi(inner_app())
assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT
class InnerError(RuntimeError):
pass
@python_app
def inner_error(s="X"):
raise InnerError("Error " + s)
@join_app
def outer_error():
return inner_error()
@join_app
def outer_two_errors():
return [inner_error("A"), inner_error("B")]
@join_app
def outer_one_error_one_result():
return [inner_error("A"), inner_app()]
def test_error():
f = outer_error()
e = f.exception()
assert isinstance(e, JoinError)
assert isinstance(e.dependent_exceptions_tids[0][0], InnerError)
def test_two_errors():
f = outer_two_errors()
e = f.exception()
assert isinstance(e, JoinError)
assert len(e.dependent_exceptions_tids) == 2
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
de1 = e.dependent_exceptions_tids[1][0]
assert isinstance(de1, InnerError)
assert de1.args[0] == "Error B"
def test_one_error_one_result():
f = outer_one_error_one_result()
e = f.exception()
assert isinstance(e, JoinError)
assert len(e.dependent_exceptions_tids) == 1
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
@join_app
def app_no_futures():
return []
def test_no_futures():
# tests that a list of futures that contains no futures will
# complete - regression test for issue #2792
f = app_no_futures()
assert f.result() == []
|