1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
import logging
import os
import time
from parsl.jobs.states import JobState, JobStatus
from parsl.launchers import AprunLauncher
from parsl.providers.cluster_provider import ClusterProvider
from parsl.providers.torque.template import template_string
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
# From the man pages for qstat for PBS/Torque systems
translate_table = {
'B': JobState.RUNNING, # This state is returned for running array jobs
'R': JobState.RUNNING,
'C': JobState.COMPLETED, # Completed after having run
'E': JobState.COMPLETED, # Exiting after having run
'H': JobState.HELD, # Held
'Q': JobState.PENDING, # Queued, and eligible to run
'W': JobState.PENDING, # Job is waiting for it's execution time (-a option) to be reached
'S': JobState.HELD # Suspended
}
class TorqueProvider(ClusterProvider, RepresentationMixin):
"""Torque Execution Provider
This provider uses qsub to submit, qstat for status, and qdel to cancel
jobs. The qsub script to be used is created from a template file in this
same module.
Parameters
----------
account : str
Account the job will be charged against.
queue : str
Torque queue to request blocks from.
nodes_per_block : int
Nodes to provision per block.
init_blocks : int
Number of blocks to provision at the start of the run. Default is 1.
min_blocks : int
Minimum number of blocks to maintain. Default is 0.
max_blocks : int
Maximum number of blocks to maintain.
parallelism : float
Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
scaling where as many resources as possible are used; parallelism close to 0 represents
the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
walltime : str
Walltime requested per block in HH:MM:SS.
scheduler_options : str
String to prepend to the #PBS blocks in the submit script to the scheduler.
WARNING: scheduler_options should only be given #PBS strings, and should not have trailing newlines.
worker_init : str
Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
launcher : Launcher
Launcher for this provider. Possible launchers include
:class:`~parsl.launchers.AprunLauncher` (the default), or
:class:`~parsl.launchers.SingleNodeLauncher`,
"""
def __init__(self,
account=None,
queue=None,
scheduler_options='',
worker_init='',
nodes_per_block=1,
init_blocks=1,
min_blocks=0,
max_blocks=1,
parallelism=1,
launcher=AprunLauncher(),
walltime="00:20:00",
cmd_timeout=120):
label = 'torque'
super().__init__(label,
nodes_per_block,
init_blocks,
min_blocks,
max_blocks,
parallelism,
walltime,
launcher,
cmd_timeout=cmd_timeout)
self.account = account
self.queue = queue
self.scheduler_options = scheduler_options
self.worker_init = worker_init
self.template_string = template_string
# Dictionary that keeps track of jobs, keyed on job_id
self.resources = {}
def _status(self):
'''Returns the status list for a list of job_ids
Args:
self
Returns:
[status...] : Status list of all jobs
'''
job_ids = list(self.resources.keys())
job_id_list = ' '.join(self.resources.keys())
jobs_missing = list(self.resources.keys())
retcode, stdout, stderr = self.execute_wait("qstat {0}".format(job_id_list))
for line in stdout.split('\n'):
parts = line.split()
if not parts or parts[0].upper().startswith('JOB') or parts[0].startswith('---'):
continue
job_id = parts[0] # likely truncated
for long_job_id in job_ids:
if long_job_id.startswith(job_id):
logger.debug('coerced job_id %s -> %s', job_id, long_job_id)
job_id = long_job_id
break
state = translate_table.get(parts[4], JobState.UNKNOWN)
self.resources[job_id]['status'] = JobStatus(state)
jobs_missing.remove(job_id)
# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
for missing_job in jobs_missing:
self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED)
def submit(self, command, tasks_per_node, job_name="parsl.torque"):
''' Submits the command onto an Local Resource Manager job.
Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node number of nodes are provisioned.
Args:
- command :(String) Commandline invocation to be made on the remote side.
- tasks_per_node (int) : command invocations to be launched per node
Kwargs:
- job_name (String): Name for job, must be unique
Returns:
- None: At capacity, cannot provision more
- job_id: (string) Identifier for the job
'''
# Set job name
job_name = "parsl.{0}.{1}".format(job_name, time.time())
# Set script path
script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
logger.debug("Requesting nodes_per_block:%s tasks_per_node:%s", self.nodes_per_block,
tasks_per_node)
job_config = {}
job_config["submit_script_dir"] = self.script_dir
job_config["nodes"] = self.nodes_per_block
job_config["task_blocks"] = self.nodes_per_block * tasks_per_node
job_config["nodes_per_block"] = self.nodes_per_block
job_config["tasks_per_node"] = tasks_per_node
job_config["walltime"] = self.walltime
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
job_config["user_script"] = command
# Wrap the command
job_config["user_script"] = self.launcher(command,
tasks_per_node,
self.nodes_per_block)
logger.debug("Writing submit script")
self._write_submit_script(self.template_string, script_path, job_name, job_config)
submit_options = ''
if self.queue is not None:
submit_options = '{0} -q {1}'.format(submit_options, self.queue)
if self.account is not None:
submit_options = '{0} -A {1}'.format(submit_options, self.account)
launch_cmd = "qsub {0} {1}".format(submit_options, script_path)
retcode, stdout, stderr = self.execute_wait(launch_cmd)
job_id = None
if retcode == 0:
for line in stdout.split('\n'):
if line.strip():
job_id = line.strip()
self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.PENDING)}
else:
message = "Command '{}' failed with return code {}".format(launch_cmd, retcode)
if (stdout is not None) and (stderr is not None):
message += "\nstderr:{}\nstdout{}".format(stderr.strip(), stdout.strip())
logger.error(message)
return job_id
def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids
Args:
job_ids : [<job_id> ...]
Returns :
[True/False...] : If the cancel operation fails the entire list will be False.
'''
job_id_list = ' '.join(job_ids)
retcode, stdout, stderr = self.execute_wait("qdel {0}".format(job_id_list))
rets = None
if retcode == 0:
for jid in job_ids:
self.resources[jid]['status'] = JobStatus(JobState.COMPLETED) # Setting state to exiting
rets = [True for i in job_ids]
else:
rets = [False for i in job_ids]
return rets
@property
def status_polling_interval(self):
return 60
|