1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
import argparse
import pytest
import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads import fresh_config
def local_config():
c = fresh_config()
c.retries = 2
return c
@python_app
def sleep_fail(sleep_dur, sleep_rand_max, fail_prob, inputs=[]):
import random
import time
s = sleep_dur + random.randint(-sleep_rand_max, sleep_rand_max)
time.sleep(s)
x = float(random.randint(0, 100)) / 100
if x <= fail_prob:
# print("Fail")
raise Exception("App failure")
else:
pass
# print("Succeed")
@python_app
def double(x):
return x * 2
@pytest.mark.local
def test_simple(n=10):
import time
start = time.time()
x = double(n)
print("Result : ", x.result())
assert x.result() == n * \
2, "Expected double to return:{0} instead got:{1}".format(
n * 2, x.result())
print("Duration : {0}s".format(time.time() - start))
print("[TEST STATUS] test_parallel_for [SUCCESS]")
return True
@pytest.mark.skip('broken')
def test_no_deps(numtasks=10):
"""Test basic error handling, with no dependent failures
"""
fus = []
for i in range(0, 10):
fu = sleep_fail(0.1, 0, .8)
fus.extend([fu])
count = 0
for fu in fus:
try:
fu.result()
except Exception as e:
print("Caught exception : ", "*" * 20)
print(e)
print("*" * 20)
count += 1
print("Caught failures of {0}/{1}".format(count, len(fus)))
@pytest.mark.skip('broken')
def test_fail_sequence(numtasks=10):
"""Test failure in a sequence of dependencies
App1 -> App2 ... -> AppN
"""
sleep_dur = 0.1
fail_prob = 0.4
fus = {0: None}
for i in range(0, numtasks):
print("Chaining {0} to {1}".format(i + 1, fus[i]))
fus[i + 1] = sleep_fail(sleep_dur, 0, fail_prob, inputs=[fus[i]])
# time.sleep(numtasks*sleep_dur)
for k in sorted(fus.keys()):
try:
x = fus[i].result()
print("{0} : {1}".format(k, x))
except Exception as e:
print("{0} : {1}".format(k, e))
return
@pytest.mark.skip('broken')
def test_deps(numtasks=10):
"""Random failures in branches of Map -> Map -> reduce
App1 App2 ... AppN
"""
fus = []
for i in range(0, numtasks):
fu = sleep_fail(0.2, 0, .4)
fus.extend([fu])
# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN
fus_2 = []
for fu in fus:
fu = sleep_fail(0, 0, .8, inputs=[fu])
fus_2.extend([fu])
# App1 App2 ... AppN
# | | |
# V V V
# App1 App2 ... AppN
# \ | /
# \ | /
# App_Final
fu_final = sleep_fail(1, 0, 0, inputs=fus_2)
try:
print("Final status : ", fu_final.result())
except parsl.dataflow.errors.DependencyError as e:
print("Caught the right exception")
print("Exception : ", e)
except Exception as e:
assert 5 == 1, "Expected DependencyError got : %s" % e
else:
print("Shoot! no errors ")
@python_app
def sleep_then_fail(sleep_dur=0.1):
import math
import time
time.sleep(sleep_dur)
math.ceil("Trigger TypeError")
return 0
@pytest.mark.skip('broken')
def test_fail_nowait(numtasks=10):
"""Test basic error handling, with no dependent failures
"""
import time
fus = []
for i in range(0, numtasks):
fu = sleep_then_fail(sleep_dur=0.1)
fus.extend([fu])
try:
[x.result() for x in fus]
except Exception as e:
assert isinstance(e, TypeError), "Expected a TypeError, got {}".format(e)
# fus[0].result()
time.sleep(1)
print("Done")
|