File: test_rand_fail.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (171 lines) | stat: -rw-r--r-- 3,864 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import argparse

import pytest

import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config


def local_config():
    c = fresh_config()
    c.retries = 2
    return c


@python_app
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
    import random
    import time

    s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)

    time.sleep(s)
    x = float(random.randint(0, 100)) / 100
    if x <= fail_prob:
        # print("Fail")
        raise Exception("App failure")
    else:
        pass
        # print("Succeed")


@python_app
def double(x):
    return x * 2


@pytest.mark.local
def test_simple(n=10):
    import time
    start = time.time()
    x = double(n)
    print("Result : ", x.result())
    assert x.result() == n * \
        2, "Expected double to return:{0} instead got:{1}".format(
            n * 2, x.result())
    print("Duration : {0}s".format(time.time() - start))
    print("[TEST STATUS] test_parallel_for [SUCCESS]")
    return True


@pytest.mark.skip('broken')
def test_no_deps(numtasks=10):
    """Test basic error handling, with no dependent failures
    """

    fus = []
    for i in range(0, 10):

        fu = sleep_fail(0.1, 0, .8)
        fus.extend([fu])

    count = 0
    for fu in fus:
        try:
            fu.result()
        except Exception as e:
            print("Caught exception : ", "*" * 20)
            print(e)
            print("*" * 20)
            count += 1

    print("Caught failures of  {0}/{1}".format(count, len(fus)))


@pytest.mark.skip('broken')
def test_fail_sequence(numtasks=10):
    """Test failure in a sequence of dependencies

    App1 -> App2 ... -> AppN
    """

    sleep_dur = 0.1
    fail_prob = 0.4

    fus = {0: None}
    for i in range(0, numtasks):
        print("Chaining {0} to {1}".format(i + 1, fus[i]))
        fus[i + 1] = sleep_fail(sleep_dur, 0, fail_prob, inputs=[fus[i]])

    # time.sleep(numtasks*sleep_dur)
    for k in sorted(fus.keys()):
        try:
            x = fus[i].result()
            print("{0} : {1}".format(k, x))
        except Exception as e:
            print("{0} : {1}".format(k, e))

    return


@pytest.mark.skip('broken')
def test_deps(numtasks=10):
    """Random failures in branches of Map -> Map -> reduce

    App1   App2  ... AppN
    """

    fus = []
    for i in range(0, numtasks):
        fu = sleep_fail(0.2, 0, .4)
        fus.extend([fu])

    # App1   App2  ... AppN
    # |       |        |
    # V       V        V
    # App1   App2  ... AppN

    fus_2 = []
    for fu in fus:
        fu = sleep_fail(0, 0, .8, inputs=[fu])
        fus_2.extend([fu])

    # App1   App2  ... AppN
    #   |       |        |
    #   V       V        V
    # App1   App2  ... AppN
    #    \      |       /
    #     \     |      /
    #       App_Final

    fu_final = sleep_fail(1, 0, 0, inputs=fus_2)

    try:
        print("Final status : ", fu_final.result())
    except parsl.dataflow.errors.DependencyError as e:
        print("Caught the right exception")
        print("Exception : ", e)
    except Exception as e:
        assert 5 == 1, "Expected DependencyError got : %s" % e
    else:
        print("Shoot! no errors ")


@python_app
def sleep_then_fail(sleep_dur=0.1):
    import math
    import time
    time.sleep(sleep_dur)
    math.ceil("Trigger TypeError")
    return 0


@pytest.mark.skip('broken')
def test_fail_nowait(numtasks=10):
    """Test basic error handling, with no dependent failures
    """
    import time
    fus = []
    for i in range(0, numtasks):
        fu = sleep_then_fail(sleep_dur=0.1)
        fus.extend([fu])

    try:
        [x.result() for x in fus]
    except Exception as e:
        assert isinstance(e, TypeError), "Expected a TypeError, got {}".format(e)

    # fus[0].result()
    time.sleep(1)
    print("Done")