File: test_globus_compute_executor.py

package info (click to toggle)
python-parsl 2025.11.10%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 12,124 kB
  • sloc: python: 24,375; makefile: 352; sh: 252; ansic: 45
file content (113 lines) | stat: -rw-r--r-- 3,509 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import random
from unittest import mock

import pytest

from parsl.executors import GlobusComputeExecutor


@pytest.fixture
def mock_ex():
    # Not Parsl's job to test GC's Executor, although it
    # still needs to be importable for these test cases.
    from globus_compute_sdk import Executor

    yield mock.Mock(spec=Executor)


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_mock_spec(mock_ex):
    # a test of tests -- make sure we're using spec= in the mock
    with pytest.raises(AttributeError):
        mock_ex.aasdf()


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_label_default(mock_ex):
    gce = GlobusComputeExecutor(mock_ex)
    assert gce.label == type(gce).__name__, "Expect reasonable default label"


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_label(mock_ex, randomstring):
    exp_label = randomstring()
    gce = GlobusComputeExecutor(mock_ex, label=exp_label)
    assert gce.label == exp_label


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_resets_spec_after_submit(mock_ex, randomstring):
    submit_res = {randomstring(): "some submit res"}
    res = {"some": randomstring(), "spec": randomstring()}
    mock_ex.resource_specification = res
    mock_ex.user_endpoint_config = None
    gce = GlobusComputeExecutor(mock_ex)

    fn = mock.Mock()
    orig_res = mock_ex.resource_specification
    orig_uep = mock_ex.user_endpoint_config

    def mock_submit(*a, **k):
        assert mock_ex.resource_specification == submit_res, "Expect set for submission"
        assert mock_ex.user_endpoint_config is None
    mock_ex.submit.side_effect = mock_submit

    gce.submit(fn, resource_specification=submit_res)

    assert mock_ex.resource_specification == orig_res
    assert mock_ex.user_endpoint_config is orig_uep


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_resets_uep_after_submit(mock_ex, randomstring):
    uep_conf = randomstring()
    res = {"some": randomstring()}
    gce = GlobusComputeExecutor(mock_ex)

    fn = mock.Mock()
    orig_res = mock_ex.resource_specification
    orig_uep = mock_ex.user_endpoint_config

    def mock_submit(*a, **k):

        assert mock_ex.resource_specification == res, "Expect set for submission"
        assert mock_ex.user_endpoint_config == uep_conf, "Expect set for submission"
    mock_ex.submit.side_effect = mock_submit

    gce.submit(fn, resource_specification={"user_endpoint_config": uep_conf, **res})

    assert mock_ex.resource_specification == orig_res
    assert mock_ex.user_endpoint_config is orig_uep


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_happy_path(mock_ex, randomstring):
    mock_fn = mock.Mock()
    args = tuple(randomstring() for _ in range(random.randint(0, 3)))
    kwargs = {randomstring(): randomstring() for _ in range(random.randint(0, 3))}

    gce = GlobusComputeExecutor(mock_ex)
    gce.submit(mock_fn, {}, *args, **kwargs)

    assert mock_ex.submit.called, "Expect proxying of args to underlying executor"
    found_a, found_k = mock_ex.submit.call_args
    assert found_a[0] is mock_fn
    assert found_a[1:] == args
    assert found_k == kwargs


@pytest.mark.local
@pytest.mark.globus_compute
def test_gc_executor_shuts_down_asynchronously(mock_ex):
    gce = GlobusComputeExecutor(mock_ex)
    gce.shutdown()
    assert mock_ex.shutdown.called
    a, k = mock_ex.shutdown.call_args
    assert k["wait"] is False
    assert k["cancel_futures"] is True