File: base.py

package info (click to toggle)
python-parsl 2025.11.10%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 12,124 kB
  • sloc: python: 24,375; makefile: 352; sh: 252; ansic: 45
file content (143 lines) | stat: -rw-r--r-- 4,990 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from __future__ import annotations

import logging
import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from multiprocessing.queues import Queue
from typing import Any, Callable, Dict, Optional

from typing_extensions import Literal, Self

from parsl.monitoring.radios.base import MonitoringRadioReceiver, RadioConfig
from parsl.monitoring.types import TaggedMonitoringMessage

logger = logging.getLogger(__name__)


class ParslExecutor(metaclass=ABCMeta):
    """Executors are abstractions that represent available compute resources
    to which you could submit arbitrary App tasks.

    This is an abstract base class that only enforces concrete implementations
    of functionality by the child classes.

    Can be used as a context manager. On exit, calls ``self.shutdown()`` with
    no arguments and re-raises any thrown exception.

    In addition to the listed methods, a ParslExecutor instance must always
    have a member field:

       label: str - a human readable label for the executor, unique
              with respect to other executors.

       remote_monitoring_radio: RadioConfig describing how tasks on this executor
              should report task resource status

    An executor may optionally expose:

       storage_access: List[parsl.data_provider.staging.Staging] - a list of staging
              providers that will be used for file staging. In the absence of this
              attribute, or if this attribute is `None`, then a default value of
              ``parsl.data_provider.staging.default_staging`` will be used by the
              staging code.

              Typechecker note: Ideally storage_access would be declared on executor
              __init__ methods as List[Staging] - however, lists are by default
              invariant, not co-variant, and it looks like @typeguard cannot be
              persuaded otherwise. So if you're implementing an executor and want to
              @typeguard the constructor, you'll have to use List[Any] here.

    The DataFlowKernel will set this attribute before calling .start(),
    if monitoring is enabled:

        monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] - an executor
            can send messages to the monitoring hub by putting them into
            this queue.
    """

    label: str = "undefined"

    def __init__(
        self,
        *,
        monitoring_messages: Optional[Queue[TaggedMonitoringMessage]] = None,
        run_dir: str = ".",
        run_id: Optional[str] = None,
    ):
        self.monitoring_messages = monitoring_messages

        self.remote_monitoring_radio: Optional[RadioConfig] = None
        self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None

        self.run_dir = os.path.abspath(run_dir)
        self.run_id = run_id

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
        self.shutdown()
        return False

    def start(self) -> None:
        """Start the executor.

        By default, this does nothing, but this method should be overridden to
        perform any spin-up operations (for example: starting thread pools).
        """
        pass

    @abstractmethod
    def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
        """Submit.

        The executor can optionally set a parsl_executor_task_id attribute on
        the Future that it returns, and in that case, parsl will log a
        relationship between the executor's task ID and parsl level try/task
        IDs.
        """
        pass

    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown the executor.

        Executors should call super().shutdown() as part of their overridden
        implementation.
        """
        if self.monitoring_receiver is not None:
            logger.info("Starting monitoring receiver shutdown")
            self.monitoring_receiver.shutdown()
            logger.info("Done with monitoring receiver shutdown")

    def monitor_resources(self) -> bool:
        """Should resource monitoring happen for tasks on running on this executor?

        Parsl resource monitoring conflicts with execution styles which do
        not directly use a process tree - for example, the ThreadPoolExecutor
        and the MPIExecutor.

        This function allows resource monitoring to be disabled per executor implementation.
        """
        return True

    @property
    def run_dir(self) -> str:
        """Path to the run directory.
        """
        return self._run_dir

    @run_dir.setter
    def run_dir(self, value: str) -> None:
        self._run_dir = value

    @property
    def run_id(self) -> Optional[str]:
        """UUID for the enclosing DFK.
        """
        return self._run_id

    @run_id.setter
    def run_id(self, value: Optional[str]) -> None:
        self._run_id = value