File: test_join.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (139 lines) | stat: -rw-r--r-- 2,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import pytest

from parsl import join_app, python_app
from parsl.dataflow.errors import JoinError

RESULT_CONSTANT = 3


@python_app
def inner_app():
    return RESULT_CONSTANT


@join_app
def outer_app():
    fut = inner_app()
    return fut


@python_app
def add_one(n):
    return n + 1


@python_app
def combine(*args):
    """Wait for an arbitrary list of futures and return them as a list"""
    return list(args)


@join_app
def outer_make_a_dag_combine(n):
    return combine(*(inner_app() for _ in range(n)))


@join_app
def outer_make_a_dag_multi(n):
    return [inner_app() for _ in range(n)]


def test_result_flow():
    f = outer_app()
    assert f.result() == RESULT_CONSTANT


@join_app
def join_wrong_type_app():
    return 3


def test_wrong_type():
    f = join_wrong_type_app()
    with pytest.raises(TypeError):
        f.result()


def test_dependency_on_joined():
    g = add_one(outer_app())
    assert g.result() == RESULT_CONSTANT + 1


def test_combine():
    f = outer_make_a_dag_combine(inner_app())
    assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


def test_multiple_return():
    f = outer_make_a_dag_multi(inner_app())
    assert f.result() == [RESULT_CONSTANT] * RESULT_CONSTANT


class InnerError(RuntimeError):
    pass


@python_app
def inner_error(s="X"):
    raise InnerError("Error " + s)


@join_app
def outer_error():
    return inner_error()


@join_app
def outer_two_errors():
    return [inner_error("A"), inner_error("B")]


@join_app
def outer_one_error_one_result():
    return [inner_error("A"), inner_app()]


def test_error():
    f = outer_error()
    e = f.exception()
    assert isinstance(e, JoinError)
    assert isinstance(e.dependent_exceptions_tids[0][0], InnerError)


def test_two_errors():
    f = outer_two_errors()
    e = f.exception()
    assert isinstance(e, JoinError)
    assert len(e.dependent_exceptions_tids) == 2

    de0 = e.dependent_exceptions_tids[0][0]
    assert isinstance(de0, InnerError)
    assert de0.args[0] == "Error A"

    de1 = e.dependent_exceptions_tids[1][0]
    assert isinstance(de1, InnerError)
    assert de1.args[0] == "Error B"


def test_one_error_one_result():
    f = outer_one_error_one_result()
    e = f.exception()

    assert isinstance(e, JoinError)
    assert len(e.dependent_exceptions_tids) == 1

    de0 = e.dependent_exceptions_tids[0][0]
    assert isinstance(de0, InnerError)
    assert de0.args[0] == "Error A"


@join_app
def app_no_futures():
    return []


def test_no_futures():
    # tests that a list of futures that contains no futures will
    # complete - regression test for issue #2792
    f = app_no_futures()
    assert f.result() == []