1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
########################################################################################
# #
# Author: Bertrand Neron, #
# Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris. #
# Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document. #
# #
########################################################################################
class Dispatcher( object ):
"""choose the right ExecutionSystem and queue for a program"""
def _getJobNameFromJobState(self , jobState ):
job_name = jobState.getName().split( '/' )[-1]
job_name = job_name[:-4]
return job_name
def getQueue(self , jobState ):
"""
@param jobState: the jobState of a job
@type jobState: a MobyleJobState instance
@return: the queue for this job
@rtype: string
"""
raise NotImplementedError( "you must redefined getQueue method in your subclass")
def getExecutionConfig(self , jobState ):
"""
@param jobState: the jobState of a job
@type jobState: a MobyleJobState instance
@return: the execution system to use to launch this job
@rtype: ExecutionSystem instance
"""
raise NotImplementedError( "you must redefined getExecutionConfig method in your subclass")
class DefaultDispatcher( Dispatcher ):
def __init__(self, routes ):
"""
@param routes: represent all execution system and queue associated to each programs name
@type routes: dict which keys are the names of programs and values a tuple
the first value of the tuple is an EXECUTION_SYSTEM_ALIAS entry and the 2nde value a queu name
routes = { string program name : ( ExecutionConfig object from EXECUTION_SYSTEM_ALIAS , string queue name ) , ...}
"""
self.routes = routes
def getExecutionConfig(self , jobState ):
"""
@param jobState: the jobState of a job
@type jobState: a MobyleJobState instance
@return: the execution config need to launch this job
@rtype: ExecutionConfig instance
"""
job_name = self._getJobNameFromJobState( jobState )
if job_name in self.routes:
exec_sys = self.routes[ job_name ][0]
else:
exec_sys = self.routes[ 'DEFAULT' ][0]
return exec_sys
def getQueue(self , jobState ):
"""
@param jobState: the jobState of a job
@type jobState: a MobyleJobState instance
@return: the queue for this job
@rtype: string
"""
job_name = self._getJobNameFromJobState( jobState )
if job_name in self.routes:
queue = self.routes[ job_name ][1]
else:
queue = self.routes[ 'DEFAULT' ][1]
return queue
def routes(self):
"""
@return: all routes for this configuration
@rtype: list of tuples ( ExecutionSystem instance , queue )
"""
return self.routes.keys()
|