1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
from concurrent.futures import Future
from threading import Event
import pytest
import parsl
from parsl.config import Config
from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader
from parsl.errors import NoDataFlowKernelError
from parsl.tests.configs.local_threads import fresh_config
@parsl.python_app
def square(x):
return x * x
@parsl.bash_app
def foo(x, stdout='foo.stdout'):
return f"echo {x + 1}"
@parsl.python_app
def wait_for_event(ev: Event):
ev.wait()
@parsl.python_app
def raise_app():
raise RuntimeError("raise_app deliberate failure")
@pytest.mark.local
def test_within_context_manger(tmpd_cwd):
config = fresh_config()
with parsl.load(config=config) as dfk:
assert isinstance(dfk, DataFlowKernel)
bash_future = foo(1, stdout=tmpd_cwd / 'foo.stdout')
assert bash_future.result() == 0
with open(tmpd_cwd / 'foo.stdout', 'r') as f:
assert f.read() == "2\n"
with pytest.raises(NoDataFlowKernelError) as excinfo:
square(2).result()
assert str(excinfo.value) == "Must first load config"
@pytest.mark.local
def test_exit_skip():
config = fresh_config()
config.exit_mode = "skip"
with parsl.load(config) as dfk:
ev = Event()
fut = wait_for_event(ev)
# deliberately don't wait for this to finish, so that the context
# manager can exit
assert parsl.dfk() is dfk, "global dfk should be left in place by skip mode"
assert not fut.done(), "wait_for_event should not be done yet"
ev.set()
# now we can wait for that result...
fut.result()
assert fut.done(), "wait_for_event should complete outside of context manager in 'skip' mode"
# now cleanup the DFK that the above `with` block
# deliberately avoided doing...
dfk.cleanup()
# 'wait' mode has two cases to test:
# 1. that we wait when there is no exception
# 2. that we do not wait when there is an exception
@pytest.mark.local
def test_exit_wait_no_exception():
config = fresh_config()
config.exit_mode = "wait"
with parsl.load(config) as dfk:
fut = square(1)
# deliberately don't wait for this to finish, so that the context
# manager can exit
assert fut.done(), "This future should be marked as done before the context manager exits"
assert dfk.cleanup_called, "The DFK should have been cleaned up by the context manager"
assert DataFlowKernelLoader._dfk is None, "The global DFK should have been removed"
@pytest.mark.local
def test_exit_wait_exception():
config = fresh_config()
config.exit_mode = "wait"
with pytest.raises(RuntimeError):
with parsl.load(config) as dfk:
# we'll never fire this future
fut_never = Future()
fut_raise = raise_app()
fut_depend = square(fut_never)
# this should cause an exception, which should cause the context
# manager to exit, without waiting for fut_depend to finish.
fut_raise.result()
assert dfk.cleanup_called, "The DFK should have been cleaned up by the context manager"
assert DataFlowKernelLoader._dfk is None, "The global DFK should have been removed"
assert fut_raise.exception() is not None, "fut_raise should contain an exception"
assert not fut_depend.done(), "fut_depend should have been left un-done (due to dependency failure)"
@pytest.mark.local
def test_exit_wrong_mode():
with pytest.raises(Exception) as ex:
Config(exit_mode="wrongmode")
# with typeguard 4.x this is TypeCheckError,
# with typeguard 2.x this is TypeError
# we can't instantiate TypeCheckError if we're in typeguard 2.x environment
# because it does not exist... so check name using strings.
assert ex.type.__name__ == "TypeCheckError" or ex.type.__name__ == "TypeError"
|