1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
import logging
from abc import abstractmethod
from string import Template
from parsl.launchers.base import Launcher
from parsl.launchers.errors import BadLauncher
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError
from parsl.utils import execute_wait
logger = logging.getLogger(__name__)
class ClusterProvider(ExecutionProvider):
""" This class defines behavior common to all cluster/supercompute-style scheduler systems.
Parameters
----------
label : str
Label for this provider.
walltime : str
Walltime requested per block in HH:MM:SS.
launcher : Launcher
Launcher for this provider.
cmd_timeout : int
Timeout for commands made to the scheduler in seconds
.. code:: python
+------------------
|
script_string ------->| submit
id <--------|---+
|
[ ids ] ------->| status
[statuses] <--------|----+
|
[ ids ] ------->| cancel
[cancel] <--------|----+
|
+-------------------
"""
def __init__(self,
label,
nodes_per_block,
init_blocks,
min_blocks,
max_blocks,
parallelism,
walltime,
launcher,
cmd_timeout=10):
self._label = label
self.nodes_per_block = nodes_per_block
self.init_blocks = init_blocks
self.min_blocks = min_blocks
self.max_blocks = max_blocks
self.parallelism = parallelism
self.launcher = launcher
self.walltime = walltime
self.cmd_timeout = cmd_timeout
if not isinstance(self.launcher, Launcher):
raise BadLauncher(self.launcher)
self.script_dir = None
# Dictionary that keeps track of jobs, keyed on job_id
self.resources = {}
def execute_wait(self, cmd, timeout=None):
t = self.cmd_timeout
if timeout is not None:
t = timeout
return execute_wait(cmd, t)
def _write_submit_script(self, template, script_filename, job_name, configs):
"""Generate submit script and write it to a file.
Args:
- template (string) : The template string to be used for the writing submit script
- script_filename (string) : Name of the submit script
- job_name (string) : job name
- configs (dict) : configs that get pushed into the template
Returns:
- None
Raises:
SchedulerMissingArgs : If template is missing args
ScriptPathError : Unable to write submit script out
"""
try:
submit_script = Template(template).substitute(jobname=job_name, **configs)
with open(script_filename, 'w') as f:
f.write(submit_script)
except KeyError as e:
logger.error("Missing keys for submit script : %s", e)
raise SchedulerMissingArgs(e.args, self.label)
except IOError as e:
logger.error("Failed writing to submit script: %s", script_filename)
raise ScriptPathError(script_filename, e)
except Exception as e:
print("Template : ", template)
print("Args : ", job_name)
print("Kwargs : ", configs)
logger.error("Uncategorized error: %s", e)
raise e
@abstractmethod
def _status(self):
pass
def status(self, job_ids):
""" Get the status of a list of jobs identified by the job identifiers
returned from the submit request.
Args:
- job_ids (list) : A list of job identifiers
Returns:
- A list of JobStatus objects corresponding to each job_id in the job_ids list.
Raises:
- ExecutionProviderException or its subclasses
"""
if job_ids:
self._status()
return [self.resources[jid]['status'] for jid in job_ids]
@property
def label(self):
return self._label
|