File: DRMAA.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (299 lines) | stat: -rwxr-xr-x 14,151 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299

########################################################################################
#                                                                                      #
#   Author: Bertrand Neron,                                                            #
#   Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris.   #  
#   Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document.         #
#                                                                                      #
########################################################################################


"""
Classes executing the command and managing the results  
"""
import os 
import imp
from logging import getLogger, shutdown as logging_shutdown
_log = getLogger(__name__)
from Mobyle.MobyleLogger import MLogger

from Mobyle.Status import Status
from Mobyle.Admin import Admin
from Mobyle.Execution.ExecutionSystem import ExecutionSystem 

from Mobyle.MobyleError import MobyleError
__extra_epydoc_fields__ = [('call', 'Called by','Called by')]


class DRMAA(ExecutionSystem):
    """
    Run the commandline with batch system DRMAA bindings
    """
    def __init__( self, drmaa_config ):
        super( DRMAA , self ).__init__( drmaa_config )
        self.drmaa_library_path = drmaa_config.drmaa_library_path
        os.environ[ 'DRMAA_LIBRARY_PATH' ] = self.drmaa_library_path
        fp , pathname , description = imp.find_module("drmaa")
        self.drmaa = imp.load_module( "drmaa" , fp , pathname , description )
        self.contactString = drmaa_config.contactString
        
    def _drmaaStatus2mobyleStatus( self , drmaaStatus ):
        if drmaaStatus == self.drmaa.JobState.RUNNING:
            return Status( 3 ) #running
        elif drmaaStatus == self.drmaa.JobState.UNDETERMINED:
            return Status( -1 ) #unknown
        elif drmaaStatus == self.drmaa.JobState.QUEUED_ACTIVE:
            return Status( 2 ) #pending
        elif drmaaStatus == self.drmaa.JobState.DONE:
            return Status( 4 ) #finished
        elif drmaaStatus == self.drmaa.JobState.FAILED:
            return Status( 5 ) # error
        elif drmaaStatus == self.drmaa.JobState.SYSTEM_ON_HOLD:
            return Status( 7 ) # hold
        elif drmaaStatus == self.drmaa.JobState.USER_ON_HOLD:
            return Status( 7 ) # hold
        elif drmaaStatus == self.drmaa.JobState.USER_SYSTEM_ON_HOLD:
            return Status( 7 ) # hold
        elif drmaaStatus == self.drmaa.JobState.SYSTEM_SUSPENDED:
            return Status( 7 ) # hold
        elif drmaaStatus == self.drmaa.JobState.USER_SUSPENDED:
            return Status( 7 ) # hold
        elif drmaaStatus == self.drmaa.JobState.USER_SYSTEM_SUSPENDED:
            return Status( 7 ) # hold
        else:
            return Status( -1 )


    def _run( self , commandLine , dirPath , serviceName , jobKey , jobState , queue , xmlEnv ):
        """
        Run the commandLine 
        redirect the standard error and output on service.name.out and service.name.err, then restore the sys.stderr and sys.stdout
        @param execution_config: a configuration object for this execution system
        @type execution_config: a L{DRMAAConfig}  instance
        @return: the L{Status} of this job and a message
        @rtype: Status
        """
        if (os.getcwd() != os.path.abspath(dirPath) ):
            msg = "the process is not in the working directory"

            self._logError( dirPath , serviceName , jobKey ,
                            userMsg = "Mobyle internal server error" ,
                            logMsg = msg )

            raise MobyleError , msg

        else:
            fout = open( serviceName + ".out" , 'w' )
            ferr = open( serviceName + ".err" , 'w' )
            try:
                drmaaSession = self.drmaa.Session( contactString = self.contactString )
                try:
                    drmaaSession.initialize()
                except self.drmaa.errors.AlreadyActiveSessionException:
                    pass
                except Exception, err:
                    self._logError( dirPath , serviceName , jobKey ,
                                    userMsg = "Mobyle internal server error" ,
                                    logMsg = None )
                    import sys
                    _log.critical( "error during drmaa intitialization for job %s/%s: %s (call by %s pid= %d)" %(serviceName, jobKey , err, sys.argv[0] , os.getpid() ) ,  exc_info= True )
                jt = drmaaSession.createJobTemplate()
                jt.workingDirectory = dirPath
                jt.jobName = jobKey
                jt.outputPath = ":" + os.path.join( dirPath , fout.name )
                jt.errorPath  = ":" + os.path.join( dirPath , ferr.name )
                jt.joinFiles = False
                jt.jobEnvironment = xmlEnv
                jt.remoteCommand = "sh"
                jt.args = [ os.path.join( dirPath , ".command" ) ]
                nativeSpecification = ''
                if self.execution_config.nativeSpecification:
                    nativeSpecification = self.execution_config.nativeSpecification
                if queue:
                    nativeSpecification = "%s -q %s" % ( nativeSpecification , queue )
                jt.nativeSpecification = nativeSpecification
                jt.blockEmail = True
                drmJobid = drmaaSession.runJob( jt )
            except self.drmaa.errors , err :
                _log.error( "cannot exit from drmaa properly try to deleting JobTemplate: " + str( err ) )
                try:
                    drmaaSession.deleteJobTemplate( jt )
                    drmaaSession.exit()
                except Exception , err :
                    _log.error( "cannot exit from drmaa properly : " + str( err ) )
                msg= "System execution failed: " +str( err ) 
                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                _log.critical( "%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )
                raise MobyleError , msg 
            except Exception , err :
                msg= "System execution failed: " +str( err )
                _log.critical("%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )
                raise MobyleError( "Internal Server Error")
                    
            adm = Admin( dirPath )
            adm.setExecutionAlias( self.execution_config_alias ) 
            adm.setNumber( drmJobid ) 
            adm.commit()
            
            linkName = ( "%s/%s.%s" %( self._cfg.admindir() ,
                                       serviceName ,
                                       jobKey
                                    )
                         )
            try:
                os.symlink(
                    os.path.join( dirPath , '.admin') ,
                    linkName
                    )
            except OSError , err:
                try:
                    drmaaSession.deleteJobTemplate( jt )
                    drmaaSession.exit()
                except Exception , err :
                    _log.error( "cannot exit from drmaa properly : " + str( err ) )
                    
                msg = "can't create symbolic link %s in ADMINDIR: %s" %( linkName , err )

                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                _log.critical( "%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )

                raise MobyleError , msg
            logging_shutdown() #close all loggers           
            #JobInfo =( jobId , hasExited , hasSignal , terminatedSignal, hasCoreDump, wasAborted, exitStatus, resourceUsage)
            #            0          1          2              3               4            5           6           7
            try:
                jobInfos = drmaaSession.wait( drmJobid , self.drmaa.Session.TIMEOUT_WAIT_FOREVER )
            except Exception , err :
                MLogger(child = True )
                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                
                _log.critical( "%s/%s : %s" %( serviceName ,
                                               jobKey ,
                                               "cannot wait the completion of job : %s" % err
                                            )
                                 )
            finally:
                MLogger(child = True )
                try:
                    drmaaSession.deleteJobTemplate( jt )
                    drmaaSession.exit()
                except Exception , err :
                    MLogger(child = True )
                    _log.error( "cannot exit from drmaa properly : " + str( err ) )
            try:
                os.unlink( linkName )
            except Exception , err:
                msg = "cannot remove symbolic link %s in ADMINDIR: %s" %( linkName , err )
                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                _log.critical( "%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )
            try:
                fout.close()
            except Exception , err:
                msg = "cannot close file: %s" %( fout.name , err )
                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                _log.critical( "%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )
            try:
                ferr.close()
            except Exception , err:
                msg = "cannot close file: %s" %( ferr.name , err )
                self._logError( dirPath , serviceName , jobKey ,
                                userMsg = "Mobyle internal server error" ,
                                logMsg = None )
                _log.critical( "%s/%s : %s" %(serviceName, jobKey, msg ), exc_info= True )
                
            status = self.status_manager.getStatus( dirPath )
            
            if not status.isEnded():
                if jobInfos[ 5 ] :#wasAborted
                    status = Status( code = 6 , message = "Your job has been cancelled" ) #killed
                else:
                    if jobInfos[ 1 ]:#hasExited
                        if jobInfos[ 6 ] == 0:#exitStatus
                            status = Status( code = 4 ) #finished
                        elif jobInfos[ 6 ] < 0 or jobInfos[ 6 ] > 128:
                            #all the signals that we don't know where they come from
                            status = Status( code = 5 , message = "Your job execution failed ( %s )" %jobInfos[ 6 ] )    
                        else:
                            status = Status( code = 4 , message = "Your job finished with an unusual status code ( %d ), check your results carefully." % jobInfos[ 6 ] )
                    else:
                        status = Status( code = 6 , message = "Your job execution failed ( %d )" %jobInfos[ 6 ] ) 
            return status
        

    def getStatus( self , key ):
        """
        @param execution_config: a configuration object for this execution system
        @type execution_config: a L{DRMAAConfig}  instance
        @param key: the value associate to the key "NUMBER" in Admin object (and .admin file )
        @type key: string
        @return: the status of the job corresponding to the key 
        @rtype: Status instance
        @call: by L{Utils.getStatus}
        """
        try:
            s = self.drmaa.Session( contactString = self.contactString )
        except Exception , err:
            _log.error( "getStatus(%s) cannot open drmma session : %s " %( key , err ) )
            return Status( -1 ) #unknown 
        try:
            s.initialize()
        except self.drmaa.errors.AlreadyActiveSessionException:
            pass
        except Exception, err:
            s.exit()
            import sys
            _log.critical( "error during drmaa intitialization for getStatus job %s: %s (call by %s pid=%d)" %(key , err,sys.argv[0], os.getpid()) ,  exc_info= True )
            return Status( -1 ) 
        try:
            drmaaStatus = s.jobStatus( key )
        except :
            s.exit()
            return Status( -1 ) #unknown 
        s.exit()
        return self._drmaaStatus2mobyleStatus( drmaaStatus ) 


    def kill( self , key ):
        """
        kill a job
        @param execution_config: a configuration object for this execution system
        @type execution_config: a L{DRMAAConfig}  instance
        @param key : the value associate to the key "NUMBER" in Admin object (and .admin file )
        @type key: string
        @raise MobyleError: if can't kill the job
        @call: by L{Utils.Mkill}
        """
        try:
            s = self.drmaa.Session( contactString = self.contactString )
        except Exception , err:
            _log.error( "kill( %s ) cannot open drmma session : %s " %( key , err ) )
            return
        try:
            s.initialize()
        except self.drmaa.errors.AlreadyActiveSessionException:
            pass
        except Exception, err:
            import sys
            _log.critical( "error during drmaa intitialization for kill job %s: %s (call by %s)" %(key , err,sys.argv[0]) ,  exc_info= True )
            return
            
        try:
            s.control( key , self.drmaa.JobControlAction.TERMINATE )
        except Exception , err :
            msg = "error when trying to kill job %s : %s" %( key , err )
            _log.error( msg )
            raise MobyleError( msg )
        finally:    
            s.exit()
        return