1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
import pytest
from parsl.app.app import bash_app
from parsl.app.futures import DataFuture
from parsl.data_provider.files import File
@bash_app
def increment(inputs=(), outputs=(), stdout=None, stderr=None):
cmd_line = """
if ! [ -f {inputs[0]} ] ; then exit 43 ; fi
x=$(cat {inputs[0]})
echo $(($x+1)) > {outputs[0]}
""".format(inputs=inputs, outputs=outputs)
return cmd_line
@bash_app
def slow_increment(dur, inputs=(), outputs=(), stdout=None, stderr=None):
cmd_line = """
x=$(cat {inputs[0]})
echo $(($x+1)) > {outputs[0]}
sleep {0}
""".format(dur, inputs=inputs, outputs=outputs)
return cmd_line
@pytest.mark.staging_required
def test_increment(tmpd_cwd, depth=5):
"""Test simple pipeline A->B...->N
"""
fpath = tmpd_cwd / "test0.txt"
fpath.write_text("0\n")
prev = [File(str(fpath))]
futs = []
for i in range(1, depth):
assert isinstance(prev[0], (DataFuture, File))
output = File(str(tmpd_cwd / f"test{i}.txt"))
f = increment(
inputs=prev,
outputs=[output],
stdout=str(tmpd_cwd / f"incr{i}.out"),
stderr=str(tmpd_cwd / f"incr{i}.err"),
)
prev = f.outputs
futs.append((i, prev[0]))
assert isinstance(prev[0], DataFuture)
for key, f in futs:
file = f.result()
expected = str(tmpd_cwd / f"test{key}.txt")
assert file.local_path is None, "File on local side has overridden local_path, file: {}".format(repr(file))
assert file.filepath == expected, "Submit side filepath has not been preserved over execution"
data = open(file.filepath).read().strip()
assert data == str(key)
@pytest.mark.staging_required
def test_increment_slow(tmpd_cwd, depth=5, dur=0.01):
"""Test simple pipeline slow (sleep.5) A->B...->N
"""
fpath = tmpd_cwd / "test0.txt"
fpath.write_text("0\n")
prev = [File(str(fpath))]
futs = []
for i in range(1, depth):
output = File(str(tmpd_cwd / f"test{i}.txt"))
f = slow_increment(
dur,
inputs=prev,
outputs=[output],
stdout=str(tmpd_cwd / f"incr{i}.out"),
stderr=str(tmpd_cwd / f"incr{i}.err"),
)
prev = f.outputs
futs.append((i, prev[0]))
for key, f in futs:
data = open(f.result().filepath).read().strip()
assert data == str(key)
|