File: globus_compute.py

package info (click to toggle)
python-parsl 2025.11.10%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,124 kB
  • sloc: python: 24,375; makefile: 352; sh: 252; ansic: 45
file content (134 lines) | stat: -rw-r--r-- 5,200 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from __future__ import annotations

import copy
from concurrent.futures import Future
from typing import Any, Callable, Dict, List, Optional

import typeguard

from parsl.data_provider.staging import Staging
from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

try:
    from globus_compute_sdk import Executor
    _globus_compute_enabled = True
except ImportError:
    _globus_compute_enabled = False


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
    """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints

    GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
    Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
    and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
    for more details.

    .. note::
       As a remote execution system, Globus Compute relies on serialization to ship
       tasks and results between the Parsl client side and the remote Globus Compute
       Endpoint side. Serialization is unreliable across python versions, and
       wrappers used by Parsl assume identical Parsl versions across on both sides.
       We recommend using matching Python, Parsl and Globus Compute version on both
       the client side and the endpoint side for stable behavior.

    """

    @typeguard.typechecked
    def __init__(
        self,
        executor: Executor,
        label: str = 'GlobusComputeExecutor',
        storage_access: Optional[List[Staging]] = None,
        working_dir: Optional[str] = None,
    ):
        """
        Parameters
        ----------

        executor: globus_compute_sdk.Executor
            Pass a globus_compute_sdk Executor that will be used to execute
            tasks on a globus_compute endpoint. Refer to `globus-compute docs
            <https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_

        label:
            a label to name the executor

        storage_access:
            a list of staging providers that will be used for file staging

        working_dir:
            The working dir to be used for file staging
        """
        if not _globus_compute_enabled:
            raise OptionalModuleMissing(
                ['globus-compute-sdk'],
                "GlobusComputeExecutor requires globus-compute-sdk installed"
            )

        super().__init__()
        self.executor: Executor = executor
        self.resource_specification = self.executor.resource_specification
        self.user_endpoint_config = self.executor.user_endpoint_config
        self.label = label
        self.storage_access = storage_access
        self.working_dir = working_dir

    def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
        """ Submit func to globus-compute


        Parameters
        ----------

        func: Callable
            Python function to execute remotely

        resource_specification: Dict[str, Any]
            Resource specification can be used specify MPI resources required by MPI applications on
            Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*
            to configure endpoints when the endpoint is a `Multi-User Endpoint
            <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_

        args:
            Args to pass to the function

        kwargs:
            kwargs to pass to the function

        Returns
        -------

        Future
        """
        res_spec = copy.deepcopy(resource_specification or self.resource_specification)
        # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
        if res_spec:
            user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
        else:
            user_endpoint_config = self.user_endpoint_config

        try:
            self.executor.resource_specification = res_spec
            self.executor.user_endpoint_config = user_endpoint_config
            return self.executor.submit(func, *args, **kwargs)
        finally:
            # Reset executor state to defaults set at configuration time
            self.executor.resource_specification = self.resource_specification
            self.executor.user_endpoint_config = self.user_endpoint_config

    def shutdown(self):
        """Clean-up the resources associated with the Executor.

        GCE.shutdown will cancel all futures that have not yet registered with
        Globus Compute and will not wait for the launched futures to complete.
        This method explicitly shutsdown the result_watcher thread to avoid
        it waiting for outstanding futures at thread exit.
        """
        self.executor.shutdown(wait=False, cancel_futures=True)
        result_watcher = self.executor._get_result_watcher()
        result_watcher.shutdown(wait=False, cancel_futures=True)

        super().shutdown()