File: test_depfail_propagation.py

package info (click to toggle)
python-parsl 2026.01.26%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,144 kB
  • sloc: python: 24,400; makefile: 352; sh: 252; ansic: 45
file content (64 lines) | stat: -rw-r--r-- 2,024 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import parsl
from parsl import python_app
from parsl.dataflow.errors import DependencyError
from parsl.dataflow.states import States


@python_app
def fails():
    raise ValueError("Deliberate failure")


@python_app
def depends(parent):
    return 1


def test_depfail_once():
    """Test the simplest dependency failure case"""
    start_dep_fail_count = parsl.dfk().task_state_counts[States.dep_fail]
    f1 = fails()
    f2 = depends(f1)

    assert isinstance(f1.exception(), Exception)
    assert not isinstance(f1.exception(), DependencyError)
    assert isinstance(f2.exception(), DependencyError)

    # check that the task ID of the failing task is mentioned
    # in the DependencyError message
    assert ("task " + str(f1.task_record['id'])) in str(f2.exception())

    assert parsl.dfk().task_state_counts[States.dep_fail] == start_dep_fail_count + 1


def test_depfail_chain():
    """Test that dependency failures chain"""
    start_dep_fail_count = parsl.dfk().task_state_counts[States.dep_fail]
    f1 = fails()
    f2 = depends(f1)
    f3 = depends(f2)
    f4 = depends(f3)

    assert isinstance(f1.exception(), Exception)
    assert not isinstance(f1.exception(), DependencyError)
    assert isinstance(f2.exception(), DependencyError)
    assert isinstance(f3.exception(), DependencyError)
    assert isinstance(f4.exception(), DependencyError)

    assert parsl.dfk().task_state_counts[States.dep_fail] == start_dep_fail_count + 3


def test_depfail_branches():
    """Test that dependency failures propagate in the
    presence of multiple downstream tasks."""
    start_dep_fail_count = parsl.dfk().task_state_counts[States.dep_fail]
    f1 = fails()
    f2 = depends(f1)
    f3 = depends(f1)

    assert isinstance(f1.exception(), Exception)
    assert not isinstance(f1.exception(), DependencyError)
    assert isinstance(f2.exception(), DependencyError)
    assert isinstance(f3.exception(), DependencyError)

    assert parsl.dfk().task_state_counts[States.dep_fail] == start_dep_fail_count + 2