1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
########################################################################################
# #
# Author: Bertrand Neron, #
# Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris. #
# Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document. #
# #
########################################################################################
"""
Classes executing the command and managing the results
"""
import os
import time
from logging import getLogger
_log = getLogger(__name__)
from Mobyle.ConfigManager import Config
from Mobyle.MobyleError import MobyleError
from Mobyle.Admin import Admin
from Mobyle.Status import Status
from Mobyle.StatusManager import StatusManager
__extra_epydoc_fields__ = [('call', 'Called by','Called by')]
class ExecutionSystem(object):
"""
abstract class
manage the status by updating the file index.xml
"""
def __init__( self , execution_config ):
"""
@param execution_config: the configuration of the Execution
@type execution_config: ExecutionConfig instance
"""
self._cfg = Config()
self.execution_config = execution_config
self.execution_config_alias = self._cfg.getAliasFromConfig( self.execution_config )
self.status_manager = StatusManager()
def run( self , commandLine , dirPath , serviceName , jobState , xmlEnv = None):
"""
@param execution_config: the configuration of the Execution
@type execution_config: ExecutionConfig instance
@param commandLine: the command to be executed
@type commandLine: String
@param dirPath: the absolute path to directory where the job will be executed (normaly we are already in)
@type dirPath: String
@param serviceName: the name of the service
@type serviceName: string
@param jobState:
@type jobState: a L{JobState} instance
"""
self.jobState = jobState
if dirPath[-1] == '/':
dirPath = dirPath[:-1]
jobKey = os.path.split( dirPath )[1]
if os.getcwd() != os.path.abspath( dirPath ):
msg = "the child process execute itself in a wrong directory"
self._logError( dirPath , serviceName ,jobKey,
userMsg = "Mobyle internal server error" ,
logMsg = msg )
raise MobyleError , msg
protectedCommandLine = ''
for c in commandLine:
protectedCommandLine += '\\'+ c
if xmlEnv is None:
xmlEnv = {}
dispatcher = self._cfg.getDispatcher()
queue = dispatcher.getQueue( jobState )
adm = Admin( dirPath )
adm.setQueue( queue )
adm.commit()
new_path = ''
binary_path = self._cfg.binary_path()
if binary_path :
new_path = ":".join( binary_path )
if xmlEnv.has_key( 'PATH' ) :
new_path = "%s:%s" %( xmlEnv[ 'PATH' ] , new_path )
if new_path :
xmlEnv[ 'PATH' ] = "%s:%s" %( new_path , os.environ[ 'PATH' ] )
else:
xmlEnv[ 'PATH' ] = os.environ[ 'PATH' ]
for var in os.environ.keys():
if var != 'PATH':
xmlEnv[ var ] = os.environ[ var ]
self._returncode = None
accounting = self._cfg.accounting()
if accounting:
beg_time = time.time()
###################################
mobyleStatus = self._run( commandLine , dirPath , serviceName , jobKey , jobState , queue , xmlEnv )
###################################
if accounting:
end_time = time.time()
elapsed_time = end_time - beg_time
a_log = getLogger( 'Mobyle.account' )
#%d trunc time to second
#%f for millisecond
a_log.info("%(serviceName)s/%(jobkey)s : %(exec_class)s/%(queue)s : %(beg_time)d-%(end_time)d %(ela_time)d : %(status)s" %{ 'serviceName':serviceName ,
'jobkey':jobKey,
'exec_class':self.execution_config.execution_class_name ,
'queue': queue,
'beg_time':beg_time ,
'end_time':end_time ,
'ela_time':elapsed_time ,
'status': mobyleStatus ,
}
)
self.status_manager.setStatus( dirPath , mobyleStatus )
def getStatus( self , number ):
"""
@param execution_config: a configuration object for this execution system
@type execution_config: an ExecutionConfig subclass instance
@param number:
@type number:
@return the status of the job
@rtype:
abstract method. this method must be implemented in child classes
"""
raise NotImplementedError, "Must be Implemented in child classes"
def kill( self , number ):
"""
kill the Job
@param execution_config: a configuration object for this execution system
@type execution_config: an ExecutionConfig subclass instance
@param number:
@type number:
abstract method. this method must be implemented in child classes
"""
raise NotImplementedError, "Must be Implemented in child classes"
def _logError( self , dirPath , serviceName , jobKey , userMsg = None , logMsg = None ):
if userMsg :
self.status_manager.setStatus( dirPath, Status( code = 5 , message = userMsg ) )
if logMsg :
_log.error( "%s/%s : %s" %( serviceName ,
jobKey ,
logMsg
)
)
|