1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
import logging
import pathlib
from subprocess import Popen, TimeoutExpired
from typing import Optional, Sequence
from unittest import mock
import pytest
from parsl import HighThroughputExecutor, curvezmq
_MOCK_BASE = "parsl.executors.high_throughput.executor"
@pytest.fixture
def encrypted(request: pytest.FixtureRequest):
if hasattr(request, "param"):
return request.param
return True
@pytest.fixture
def htex(encrypted: bool):
htex = HighThroughputExecutor(encrypted=encrypted)
yield htex
htex.shutdown()
@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
@pytest.mark.parametrize("cert_dir_provided", (True, False))
def test_htex_start_encrypted(
encrypted: bool,
cert_dir_provided: bool,
htex: HighThroughputExecutor,
tmpd_cwd: pathlib.Path,
):
htex.run_dir = str(tmpd_cwd)
if cert_dir_provided:
provided_base_dir = tmpd_cwd / "provided"
provided_base_dir.mkdir()
cert_dir = curvezmq.create_certificates(provided_base_dir)
htex.cert_dir = cert_dir
else:
cert_dir = curvezmq.create_certificates(htex.logdir)
if not encrypted and cert_dir_provided:
with pytest.raises(AttributeError) as pyt_e:
htex.start()
assert "change cert_dir to None" in str(pyt_e.value)
return
htex.start()
assert htex.encrypted is encrypted
if encrypted:
assert htex.cert_dir == cert_dir
assert htex.outgoing_q.zmq_context.cert_dir == cert_dir
assert htex.incoming_q.zmq_context.cert_dir == cert_dir
assert htex.command_client.zmq_context.cert_dir == cert_dir
assert isinstance(htex.outgoing_q.zmq_context, curvezmq.ClientContext)
assert isinstance(htex.incoming_q.zmq_context, curvezmq.ClientContext)
assert isinstance(htex.command_client.zmq_context, curvezmq.ClientContext)
else:
assert htex.cert_dir is None
assert htex.outgoing_q.zmq_context.cert_dir is None
assert htex.incoming_q.zmq_context.cert_dir is None
assert htex.command_client.zmq_context.cert_dir is None
@pytest.mark.local
@pytest.mark.parametrize("started", (True, False))
@pytest.mark.parametrize("timeout_expires", (True, False))
def test_htex_shutdown(
started: bool,
timeout_expires: bool,
htex: HighThroughputExecutor,
caplog
):
mock_ix_proc = mock.Mock(spec=Popen)
if started:
htex.interchange_proc = mock_ix_proc
# This will, in the absence of any exit trigger, block forever if
# no timeout is given and if the interchange does not terminate.
# Raise an exception to report that, rather than actually block,
# and hope that nothing is catching that exception.
# this function implements the behaviour if the interchange has
# not received a termination call
def proc_wait_alive(timeout):
if timeout:
raise TimeoutExpired(cmd="mock-interchange", timeout=timeout)
else:
raise RuntimeError("This wait call would hang forever")
def proc_wait_terminated(timeout):
return 0
mock_ix_proc.wait.side_effect = proc_wait_alive
if not timeout_expires:
# Simulate termination of the Interchange process
def kill_interchange(*args, **kwargs):
mock_ix_proc.wait.side_effect = proc_wait_terminated
mock_ix_proc.terminate.side_effect = kill_interchange
with caplog.at_level(logging.INFO):
htex.shutdown()
if started:
assert mock_ix_proc.terminate.called
assert mock_ix_proc.wait.called
assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
if timeout_expires:
assert "Unable to terminate Interchange" in caplog.text
assert mock_ix_proc.kill.called
assert "Attempting HighThroughputExecutor shutdown" in caplog.text
assert "Finished HighThroughputExecutor shutdown" in caplog.text
else:
assert not mock_ix_proc.terminate.called
assert not mock_ix_proc.wait.called
assert "HighThroughputExecutor has not started" in caplog.text
@pytest.mark.local
@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd"))
def test_htex_worker_pool_launch_cmd(cmd: Optional[str]):
if cmd:
htex = HighThroughputExecutor(launch_cmd=cmd)
assert htex.launch_cmd == cmd
else:
htex = HighThroughputExecutor()
assert htex.launch_cmd.startswith("process_worker_pool.py")
@pytest.mark.local
@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"]))
def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]):
if cmd:
htex = HighThroughputExecutor(interchange_launch_cmd=cmd)
assert htex.interchange_launch_cmd == cmd
else:
htex = HighThroughputExecutor()
assert htex.interchange_launch_cmd == ["interchange.py"]
|