1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# Copyright 2013-2015, The James Hutton Insitute
# Author: Leighton Pritchard
#
# This code is part of the pyani package, and is governed by its licence.
# Please see the LICENSE file that should have been included as part of
# this package.
"""Code to manage jobs for pyani.
In order to be a little more consistent behind the scenes for schedulers,
and to allow for a fairly hacky approach to scheduing on SGE, a job
dependency graph is used.
Commands to be run are stored in Jobs. A Job's dependency is stored so that
the Job will not be executed until its dependency is executed.
When used in ANI analysis, the way jobs are used depends on the scheduler.
With multiprocessing, we place all root jobs in a single pool; then all
first-level dependencies will go in a second (dependent) pool that is not run
until the first is completed, and so on. It's not very efficient, but should
work equivalently to the original code that handled asynchronous pools
directly.
With SGE, the dependencies can be managed independently, and effectively
interleaved by the scheduler with no need for pools.
This code is essentially a frozen and cut-down version of pysge
(https://github.com/widdowquinn/pysge)
"""
import os
import time
from .pyani_config import SGE_WAIT
###
# CLASSES
# The Job class describes a single command-line job, with dependencies (jobs
# that must be run first.
class Job:
"""Objects in this class represent individual jobs to be run, with a list
of dependencies (jobs that must be run first).
"""
def __init__(self, name, command, queue=None):
"""Instantiates a Job object.
- name String describing the job (uniquely)
- command String, the valid shell command to run the job
- queue String, the SGE queue under which the job shall run
"""
self.name = name # Unique name for the job
self.queue = queue # The SGE queue to run the job under
self.command = command # Command line to run for this job
self.script = command
self.scriptPath = None # Will hold path to the script file
self.dependencies = [] # List of jobs to be completed first
self.submitted = False # Flag: is job submitted?
def add_dependency(self, job):
"""Add the passed job to the dependency list for this Job. This
Job should not execute until all dependent jobs are completed
- job Job to be added to the Job's dependency list
"""
self.dependencies.append(job)
def remove_dependency(self, job):
"""Remove the passed job from this Job's dependency list
- job Job to be removed from the Job's dependency list
"""
self.dependencies.remove(job)
def wait(self, interval=SGE_WAIT):
"""Wait until the job finishes, and poll SGE on its status."""
finished = False
while not finished:
time.sleep(interval)
interval = min(2 * interval, 60)
finished = os.system("qstat -j %s > /dev/null" % (self.name))
class JobGroup:
""" Class that stores a group of jobs, permitting parameter sweeps."""
def __init__(self, name, command, queue=None, arguments=None):
""" Instantiate a JobGroup object. JobGroups allow for the use of
combinatorial parameter sweeps by using the 'command' and 'arguments'
arguments.
- name String, the JobGroup name
- command String, the command to be run, with arguments
specified
- queue String, the queue for SGE to use
- arguments Dictionary, the values for each parameter as
lists of strings, keyed by an identifier for
the command string
For example, to use a command 'my_cmd' with the arguments
'-foo' and '-bar' having values 1, 2, 3, 4 and 'a', 'b', 'c', 'd' in
all combinations, respectively, you would pass
command='my_cmd $SGE_TASK_ID -foo $fooargs -bar $barargs'
arguments='{'fooargs': ['1','2','3','4'],
'barargs': ['a','b','c','d']}
"""
self.name = name # Set JobQueue name
self.queue = queue # Set SGE queue to request
self.command = command # Set command string
self.dependencies = [] # Create empty list for dependencies
self.submitted = True # Set submitted Boolean
if arguments is not None:
self.arguments = arguments # Dictionary of arguments for command
else:
self.arguments = {}
self.generate_script() # Make SGE script for sweep/array
def generate_script(self):
"""Create the SGE script that will run the jobs in the JobGroup, with
the passed arguments.
"""
self.script = "" # Holds the script string
total = 1 # total number of jobs in this group
# for now, SGE_TASK_ID becomes TASK_ID, but we base it at zero
self.script += """let "TASK_ID=$SGE_TASK_ID - 1"\n"""
# build the array definitions; force ordering for Python3.5 tests
for key in sorted(self.arguments.keys()):
values = self.arguments[key]
line = ("%s_ARRAY=( " % (key))
for value in values:
line += value
line += " "
line += " )\n"
self.script += line
total *= len(values)
self.script += "\n"
# now, build the decoding logic in the script; force ordering
for key in sorted(self.arguments.keys()):
count = len(self.arguments[key])
self.script += """let "%s_INDEX=$TASK_ID %% %d"\n""" % (key, count)
self.script += """%s=${%s_ARRAY[$%s_INDEX]}\n""" % (key, key, key)
self.script += """let "TASK_ID=$TASK_ID / %d"\n""" % (count)
# now, add the command to run the job
self.script += "\n"
self.script += self.command
self.script += "\n"
# set the number of tasks in this group
self.tasks = total
def add_dependency(self, job):
"""Add the passed job to the dependency list for this JobGroup. This
JobGroup should not execute until all dependent jobs are completed
- job Job, job to be added to the JobGroup's dependency list
"""
self.dependencies.append(job)
def remove_dependency(self, job):
""" Remove the passed job from this JobGroup's dependency list
- job Job, job to be removed from the JobGroup's dependency list
"""
self.dependencies.remove(job)
def wait(self, interval=SGE_WAIT):
"""Wait for a defined period, then poll SGE for job status."""
finished = False
while not finished:
time.sleep(interval)
interval = min(2 * interval, 60)
finished = os.system("qstat -j %s > /dev/null" % (self.name))
|