File: test_htex.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (148 lines) | stat: -rw-r--r-- 4,901 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import logging
import pathlib
from subprocess import Popen, TimeoutExpired
from typing import Optional, Sequence
from unittest import mock

import pytest

from parsl import HighThroughputExecutor, curvezmq

_MOCK_BASE = "parsl.executors.high_throughput.executor"


@pytest.fixture
def encrypted(request: pytest.FixtureRequest):
    if hasattr(request, "param"):
        return request.param
    return True


@pytest.fixture
def htex(encrypted: bool):
    htex = HighThroughputExecutor(encrypted=encrypted)

    yield htex

    htex.shutdown()


@pytest.mark.local
@pytest.mark.parametrize("encrypted", (True, False), indirect=True)
@pytest.mark.parametrize("cert_dir_provided", (True, False))
def test_htex_start_encrypted(
    encrypted: bool,
    cert_dir_provided: bool,
    htex: HighThroughputExecutor,
    tmpd_cwd: pathlib.Path,
):
    htex.run_dir = str(tmpd_cwd)
    if cert_dir_provided:
        provided_base_dir = tmpd_cwd / "provided"
        provided_base_dir.mkdir()
        cert_dir = curvezmq.create_certificates(provided_base_dir)
        htex.cert_dir = cert_dir
    else:
        cert_dir = curvezmq.create_certificates(htex.logdir)

    if not encrypted and cert_dir_provided:
        with pytest.raises(AttributeError) as pyt_e:
            htex.start()
        assert "change cert_dir to None" in str(pyt_e.value)
        return

    htex.start()

    assert htex.encrypted is encrypted
    if encrypted:
        assert htex.cert_dir == cert_dir
        assert htex.outgoing_q.zmq_context.cert_dir == cert_dir
        assert htex.incoming_q.zmq_context.cert_dir == cert_dir
        assert htex.command_client.zmq_context.cert_dir == cert_dir
        assert isinstance(htex.outgoing_q.zmq_context, curvezmq.ClientContext)
        assert isinstance(htex.incoming_q.zmq_context, curvezmq.ClientContext)
        assert isinstance(htex.command_client.zmq_context, curvezmq.ClientContext)
    else:
        assert htex.cert_dir is None
        assert htex.outgoing_q.zmq_context.cert_dir is None
        assert htex.incoming_q.zmq_context.cert_dir is None
        assert htex.command_client.zmq_context.cert_dir is None


@pytest.mark.local
@pytest.mark.parametrize("started", (True, False))
@pytest.mark.parametrize("timeout_expires", (True, False))
def test_htex_shutdown(
    started: bool,
    timeout_expires: bool,
    htex: HighThroughputExecutor,
    caplog
):
    mock_ix_proc = mock.Mock(spec=Popen)

    if started:
        htex.interchange_proc = mock_ix_proc

    # This will, in the absence of any exit trigger, block forever if
    # no timeout is given and if the interchange does not terminate.
    # Raise an exception to report that, rather than actually block,
    # and hope that nothing is catching that exception.

    # this function implements the behaviour if the interchange has
    # not received a termination call
    def proc_wait_alive(timeout):
        if timeout:
            raise TimeoutExpired(cmd="mock-interchange", timeout=timeout)
        else:
            raise RuntimeError("This wait call would hang forever")

    def proc_wait_terminated(timeout):
        return 0

    mock_ix_proc.wait.side_effect = proc_wait_alive

    if not timeout_expires:
        # Simulate termination of the Interchange process
        def kill_interchange(*args, **kwargs):
            mock_ix_proc.wait.side_effect = proc_wait_terminated

        mock_ix_proc.terminate.side_effect = kill_interchange

    with caplog.at_level(logging.INFO):
        htex.shutdown()

    if started:
        assert mock_ix_proc.terminate.called
        assert mock_ix_proc.wait.called
        assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
        if timeout_expires:
            assert "Unable to terminate Interchange" in caplog.text
            assert mock_ix_proc.kill.called
        assert "Attempting HighThroughputExecutor shutdown" in caplog.text
        assert "Finished HighThroughputExecutor shutdown" in caplog.text
    else:
        assert not mock_ix_proc.terminate.called
        assert not mock_ix_proc.wait.called
        assert "HighThroughputExecutor has not started" in caplog.text


@pytest.mark.local
@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd"))
def test_htex_worker_pool_launch_cmd(cmd: Optional[str]):
    if cmd:
        htex = HighThroughputExecutor(launch_cmd=cmd)
        assert htex.launch_cmd == cmd
    else:
        htex = HighThroughputExecutor()
        assert htex.launch_cmd.startswith("process_worker_pool.py")


@pytest.mark.local
@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"]))
def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]):
    if cmd:
        htex = HighThroughputExecutor(interchange_launch_cmd=cmd)
        assert htex.interchange_launch_cmd == cmd
    else:
        htex = HighThroughputExecutor()
        assert htex.interchange_launch_cmd == ["interchange.py"]