File: base.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (157 lines) | stat: -rw-r--r-- 5,205 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional

from typing_extensions import Literal, Self

from parsl.monitoring.radios.base import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
    """Executors are abstractions that represent available compute resources
    to which you could submit arbitrary App tasks.

    This is an abstract base class that only enforces concrete implementations
    of functionality by the child classes.

    Can be used as a context manager. On exit, calls ``self.shutdown()`` with
    no arguments and re-raises any thrown exception.

    In addition to the listed methods, a ParslExecutor instance must always
    have a member field:

       label: str - a human readable label for the executor, unique
              with respect to other executors.

    Per-executor monitoring behaviour can be influenced by exposing:

       radio_mode: str - a string describing which radio mode should be used to
              send task resource data back to the submit side.

    An executor may optionally expose:

       storage_access: List[parsl.data_provider.staging.Staging] - a list of staging
              providers that will be used for file staging. In the absence of this
              attribute, or if this attribute is `None`, then a default value of
              ``parsl.data_provider.staging.default_staging`` will be used by the
              staging code.

              Typechecker note: Ideally storage_access would be declared on executor
              __init__ methods as List[Staging] - however, lists are by default
              invariant, not co-variant, and it looks like @typeguard cannot be
              persuaded otherwise. So if you're implementing an executor and want to
              @typeguard the constructor, you'll have to use List[Any] here.
    """

    label: str = "undefined"
    radio_mode: str = "udp"

    def __init__(
        self,
        *,
        hub_address: Optional[str] = None,
        hub_zmq_port: Optional[int] = None,
        submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
        run_dir: str = ".",
        run_id: Optional[str] = None,
    ):
        self.hub_address = hub_address
        self.hub_zmq_port = hub_zmq_port
        self.submit_monitoring_radio = submit_monitoring_radio
        self.run_dir = os.path.abspath(run_dir)
        self.run_id = run_id

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
        self.shutdown()
        return False

    @abstractmethod
    def start(self) -> None:
        """Start the executor.

        Any spin-up operations (for example: starting thread pools) should be performed here.
        """
        pass

    @abstractmethod
    def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
        """Submit.

        The executor can optionally set a parsl_executor_task_id attribute on
        the Future that it returns, and in that case, parsl will log a
        relationship between the executor's task ID and parsl level try/task
        IDs.
        """
        pass

    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown the executor.

        This includes all attached resources such as workers and controllers.
        """
        pass

    def monitor_resources(self) -> bool:
        """Should resource monitoring happen for tasks on running on this executor?

        Parsl resource monitoring conflicts with execution styles which use threads, and
        can deadlock while running.

        This function allows resource monitoring to be disabled per executor implementation.
        """
        return True

    @property
    def run_dir(self) -> str:
        """Path to the run directory.
        """
        return self._run_dir

    @run_dir.setter
    def run_dir(self, value: str) -> None:
        self._run_dir = value

    @property
    def run_id(self) -> Optional[str]:
        """UUID for the enclosing DFK.
        """
        return self._run_id

    @run_id.setter
    def run_id(self, value: Optional[str]) -> None:
        self._run_id = value

    @property
    def hub_address(self) -> Optional[str]:
        """Address to the Hub for monitoring.
        """
        return self._hub_address

    @hub_address.setter
    def hub_address(self, value: Optional[str]) -> None:
        self._hub_address = value

    @property
    def hub_zmq_port(self) -> Optional[int]:
        """Port to the Hub for monitoring.
        """
        return self._hub_zmq_port

    @hub_zmq_port.setter
    def hub_zmq_port(self, value: Optional[int]) -> None:
        self._hub_zmq_port = value

    @property
    def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
        """Local radio for sending monitoring messages
        """
        return self._submit_monitoring_radio

    @submit_monitoring_radio.setter
    def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
        self._submit_monitoring_radio = value