1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional
from typing_extensions import Literal, Self
from parsl.monitoring.radios.base import MonitoringRadioSender
class ParslExecutor(metaclass=ABCMeta):
"""Executors are abstractions that represent available compute resources
to which you could submit arbitrary App tasks.
This is an abstract base class that only enforces concrete implementations
of functionality by the child classes.
Can be used as a context manager. On exit, calls ``self.shutdown()`` with
no arguments and re-raises any thrown exception.
In addition to the listed methods, a ParslExecutor instance must always
have a member field:
label: str - a human readable label for the executor, unique
with respect to other executors.
Per-executor monitoring behaviour can be influenced by exposing:
radio_mode: str - a string describing which radio mode should be used to
send task resource data back to the submit side.
An executor may optionally expose:
storage_access: List[parsl.data_provider.staging.Staging] - a list of staging
providers that will be used for file staging. In the absence of this
attribute, or if this attribute is `None`, then a default value of
``parsl.data_provider.staging.default_staging`` will be used by the
staging code.
Typechecker note: Ideally storage_access would be declared on executor
__init__ methods as List[Staging] - however, lists are by default
invariant, not co-variant, and it looks like @typeguard cannot be
persuaded otherwise. So if you're implementing an executor and want to
@typeguard the constructor, you'll have to use List[Any] here.
"""
label: str = "undefined"
radio_mode: str = "udp"
def __init__(
self,
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port
self.submit_monitoring_radio = submit_monitoring_radio
self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
self.shutdown()
return False
@abstractmethod
def start(self) -> None:
"""Start the executor.
Any spin-up operations (for example: starting thread pools) should be performed here.
"""
pass
@abstractmethod
def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
"""Submit.
The executor can optionally set a parsl_executor_task_id attribute on
the Future that it returns, and in that case, parsl will log a
relationship between the executor's task ID and parsl level try/task
IDs.
"""
pass
@abstractmethod
def shutdown(self) -> None:
"""Shutdown the executor.
This includes all attached resources such as workers and controllers.
"""
pass
def monitor_resources(self) -> bool:
"""Should resource monitoring happen for tasks on running on this executor?
Parsl resource monitoring conflicts with execution styles which use threads, and
can deadlock while running.
This function allows resource monitoring to be disabled per executor implementation.
"""
return True
@property
def run_dir(self) -> str:
"""Path to the run directory.
"""
return self._run_dir
@run_dir.setter
def run_dir(self, value: str) -> None:
self._run_dir = value
@property
def run_id(self) -> Optional[str]:
"""UUID for the enclosing DFK.
"""
return self._run_id
@run_id.setter
def run_id(self, value: Optional[str]) -> None:
self._run_id = value
@property
def hub_address(self) -> Optional[str]:
"""Address to the Hub for monitoring.
"""
return self._hub_address
@hub_address.setter
def hub_address(self, value: Optional[str]) -> None:
self._hub_address = value
@property
def hub_zmq_port(self) -> Optional[int]:
"""Port to the Hub for monitoring.
"""
return self._hub_zmq_port
@hub_zmq_port.setter
def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value
@property
def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._submit_monitoring_radio
@submit_monitoring_radio.setter
def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._submit_monitoring_radio = value
|