1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import logging
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Any, Dict, List, Optional
from parsl.jobs.states import JobStatus
logger = logging.getLogger(__name__)
class ExecutionProvider(metaclass=ABCMeta):
"""Execution providers are responsible for managing execution resources
that have a Local Resource Manager (LRM). For instance, campus clusters
and supercomputers generally have LRMs (schedulers) such as Slurm,
Torque/PBS, and Condor. Clouds, on the other hand, have API
interfaces that allow much more fine-grained composition of an execution
environment. An execution provider abstracts these types of resources and
provides a single uniform interface to them.
The providers abstract away the interfaces provided by various systems to
request, monitor, and cancel compute resources.
.. code:: python
+------------------
|
script_string ------->| submit
id <--------|---+
|
[ ids ] ------->| status
[statuses] <--------|----+
|
[ ids ] ------->| cancel
[cancel] <--------|----+
|
+-------------------
"""
@abstractmethod
def __init__(self) -> None:
self.min_blocks: int
self.max_blocks: int
self.init_blocks: int
self.nodes_per_block: int
self.script_dir: Optional[str]
self.parallelism: float
self.resources: Dict[object, Any]
self._cores_per_node: Optional[int] = None
self._mem_per_node: Optional[float] = None
pass
@abstractmethod
def submit(self, command: str, tasks_per_node: int, job_name: str = "parsl.auto") -> object:
''' The submit method takes the command string to be executed upon
instantiation of a resource most often to start a pilot (such as for
HighThroughputExecutor or WorkQueueExecutor).
Args :
- command (str) : The bash command string to be executed
- tasks_per_node (int) : command invocations to be launched per node
KWargs:
- job_name (str) : Human friendly name to be assigned to the job request
Returns:
- A job identifier, this could be an integer, string etc
or None or any other object that evaluates to boolean false
if submission failed but an exception isn't thrown.
Raises:
- ExecutionProviderException or its subclasses
'''
pass
@abstractmethod
def status(self, job_ids: List[object]) -> List[JobStatus]:
''' Get the status of a list of jobs identified by the job identifiers
returned from the submit request.
Args:
- job_ids (list) : A list of job identifiers
Returns:
- A list of JobStatus objects corresponding to each job_id in the job_ids list.
Raises:
- ExecutionProviderException or its subclasses
'''
pass
@abstractmethod
def cancel(self, job_ids: List[object]) -> List[bool]:
''' Cancels the resources identified by the job_ids provided by the user.
Args:
- job_ids (list): A list of job identifiers
Returns:
- A list of status from cancelling the job which can be True, False
Raises:
- ExecutionProviderException or its subclasses
'''
pass
@abstractproperty
def label(self) -> str:
''' Provides the label for this provider '''
pass
@property
def mem_per_node(self) -> Optional[float]:
"""Real memory to provision per node in GB.
Providers which set this property should ask for mem_per_node of memory
when provisioning resources, and set the corresponding environment
variable PARSL_MEMORY_GB before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can
run concurrently per node.
"""
return self._mem_per_node
@mem_per_node.setter
def mem_per_node(self, value: float) -> None:
self._mem_per_node = value
@property
def cores_per_node(self) -> Optional[int]:
"""Number of cores to provision per node.
Providers which set this property should ask for cores_per_node cores
when provisioning resources, and set the corresponding environment
variable PARSL_CORES before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can
run concurrently per node.
"""
return self._cores_per_node
@cores_per_node.setter
def cores_per_node(self, value: int) -> None:
self._cores_per_node = value
@property
@abstractmethod
def status_polling_interval(self) -> int:
"""Returns the interval, in seconds, at which the status method should be called.
:return: the number of seconds to wait between calls to status()
"""
pass
|