File: SGE.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (265 lines) | stat: -rwxr-xr-x 11,034 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
########################################################################################
#                                                                                      #
#   Author: Bertrand Neron,                                                            #
#   Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris.   #  
#   Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document.         #
#                                                                                      #
########################################################################################


"""
Classes executing the command and managing the results  
"""
import os
from subprocess import Popen, PIPE

from logging import getLogger, shutdown as logging_shutdown
_log = getLogger(__name__)
from Mobyle.MobyleLogger import MLogger

from Mobyle.Admin import Admin
from Mobyle.Execution.ExecutionSystem import ExecutionSystem 
from Mobyle.MobyleError import MobyleError
from Mobyle.Status import Status

__extra_epydoc_fields__ = [('call', 'Called by','Called by')]

        
                
class SGE (ExecutionSystem):
    """
    Run the commandline with Sun GridEngine commands
    """
        
    def __init__( self, sge_config ):
        super( SGE , self ).__init__( sge_config )
        arch_path= os.path.join( sge_config.root , 'util' , 'arch' )
        try:
            arch_pipe = Popen(arch_path         ,
                              shell     = False ,
                              stdout    = PIPE  ,
                              stdin     = None  ,
                              stderr    = None  ,
                              close_fds = True  )
            arch_pipe.wait() 
            arch_rc = arch_pipe.returncode
        except OSError , err:
            #this error is log by calling method because I can't access to jobKey , adm status ... from static method
            msg = "SGE: I can't determined the system arch:"+str(err)
            raise MobyleError( msg )
        if arch_rc != 0 :
            msg = "I can't determined the system arch (return code = " + str( arch_rc ) + " )"
            raise MobyleError( msg )
        
        arch = ''.join( arch_pipe.stdout.readlines() ).strip()

        self.sge_prefix = os.path.join( sge_config.root , 'bin' , arch )
        self.sge_env = {'SGE_CELL': sge_config.cell , 'SGE_ROOT': sge_config.root }   


    def _run( self , commandLine , dirPath , serviceName , jobKey , jobState , queue , xmlEnv ):
        """
        @param execution_config: the configuration of the Execution 
        @type execution_config: ExecutionConfig instance
        @param commandLine: the command to be executed
        @type commandLine: String
        @param dirPath: the absolute path to directory where the job will be executed (normaly we are already in)
        @type dirPath: String
        @param serviceName: the name of the service
        @type serviceName: string
        @param jobState:
        @type jobState: a L{JobState} instance
        """
        
        options = { '-cwd': ''           ,  #set the working dir to current dir
                    '-now': 'n'          ,  #the job is not executed immediately
                    '-N' : jobKey        ,  #the id of the job
                    '-V' : ''               #job inherits of the whole environment
                    }
        if queue:
            options[ '-q' ] = queue
        sge_opts = ''

        for opt in options.keys():
            sge_opts += opt + ' ' + options[opt]+' '
                 
        sge_cmd = os.path.join( self.sge_prefix , 'qrsh' ) + ' ' + sge_opts
        cmd = sge_cmd + " " + commandLine

        try:
            fout = open( serviceName + ".out" , 'w' )
            ferr = open( serviceName + ".err" , 'w' )
        except IOError , err:
            msg= "SGE: can't open file for standard job output: "+ str(err)
            self._logError( dirPath , serviceName , jobKey ,
                            admMsg = msg ,
                            userMsg = "Mobyle internal server error" ,
                            logMsg = msg )

            raise MobyleError , msg
        xmlEnv.update( self.sge_env )
        try:
            pipe = Popen( cmd ,
                          shell  = True ,
                          stdout = fout  ,
                          stdin  = None  ,
                          stderr = ferr  ,
                          close_fds = True ,
                          env       = xmlEnv
                          )
        except OSError, err:
            msg= "SGE execution failed: "+ str(err)
            self._logError( dirPath , serviceName , jobKey ,
                           admMsg = msg ,
                            userMsg = "Mobyle internal server error" ,
                            logMsg = None )
            _log.critical( "%s/%s : %s" %( serviceName ,
                                           jobKey ,
                                           msg
                                           ) ,
                            exc_info = True
                          )
            raise MobyleError , msg
        adm = Admin( dirPath )
        adm.setExecutionAlias( self.execution_config_alias  )
        adm.setNumber( jobKey )
        adm.commit()
        linkName = ( "%s/%s.%s" %( self._cfg.admindir() ,
                                   serviceName ,
                                   jobKey
                                   )
                                   )
        try:
            os.symlink(
                       os.path.join( dirPath , '.admin') ,
                       linkName
                       )
        except OSError , err:
            msg = "can't create symbolic link %s in ADMINDIR: %s" %( linkName , err )
            self._logError( dirPath , serviceName , jobKey ,
                            admMsg = msg ,
                            userMsg = "Mobyle internal server error" ,
                            logMsg = None )
            _log.critical( "%s/%s : %s" %( serviceName ,
                                           jobKey  ,
                                           msg
                                        )
                            )
            raise MobyleError , msg
        logging_shutdown() #close all loggers 
        pipe.wait()
        MLogger(child = True )
        try:
            os.unlink( linkName )
        except OSError , err:
            msg = "can't remove symbolic link %s in ADMINDIR: %s" %( linkName , err )
            self._logError( dirPath , serviceName , jobKey ,
                            admMsg = msg ,
                            userMsg = "Mobyle internal server error" ,
                            logMsg = None )
            _log.critical( "%s/%s : %s" %( serviceName ,
                                           jobKey ,
                                           msg
                                            )
                            )
            raise MobyleError , msg

        fout.close()
        ferr.close()
        #self._returncode = pipe.returncode
        oldStatus = self.status_manager.getStatus( dirPath )

        if oldStatus.isEnded():
            return oldStatus
        else:
            if pipe.returncode == 0 :# finished
                status = Status( code = 4 ) #finished
            elif pipe.returncode in ( 137 , 143 , 153 ): #killed
                ## 137 =  9 ( SIGKILL ) + 128 ( python add 128 )
                ## 143 = 15 ( SIGTERM )
                ## 153 = 25 ( SIGXFSZ ) + 128 (file size exceeded )
                ## be careful if a job exit with a 137 or 153 exit status it will be labelled as killed
                status = Status( code = 6 , message = "Your job has been cancelled" ) 
            elif pipe.returncode > 128 :
                status = Status( code = 6 , message = "Your job execution failed ( %s )" %pipe.returncode ) 
                ## if return code > 128 it's a signal
                ## the 9 , 15 25 signal are send by administrator
                ## the other are self aborting signal see signal(7)   
            else:
                status = Status( code = 4 , message = "Your job finished with an unusual status code ( %s ), check your results carefully." % pipe.returncode )
            return status


    def getStatus( self ,jobNum ):
        """
        @param sge_config: the configuration of this Execution engine
        @type sge_config: a L{SGEConfig} instance
        @param jobNum:
        @type jobNum:
        @return: the status of job with number jobNum
        @rtype: a Mobyle.Status.Status instance
        @todo: for best performance, restrict the sge querying to one queue
        """
        sge_cmd = [ os.path.join( self.sge_prefix , 'qstat' ) , '-r' ]
        
        try:
            pipe = Popen( sge_cmd,
                          executable = sge_cmd[0] ,
                          shell = False ,
                          stdout = PIPE ,
                          stdin = None  ,
                          stderr= None  ,
                          close_fds = True ,
                          env= self.sge_env
                          )
        except OSError , err:
            raise MobyleError , "can't querying sge : %s :%s "%( sge_cmd , err )

        sge2mobyleStatus = { 'r' : 3 , # running
                             't' : 3 ,
                             'R' : 3 ,
                             's' : 7 , #hold
                             'S' : 7 , 
                             'T' : 7 ,
                             'h' : 7 ,
                             'w' : 2 , #pending
                             'd' : 6 , #killed 
                             'E' : 5 , #error
                             }

        for job in pipe.stdout :
            
            job_sge_status = job.split()
            try:
                status = job_sge_status[ 4 ][ -1 ]
                continue
            except ( ValueError , IndexError ):
                pass #it's not the fisrt line
            try:
                if not ( jobNum == job_sge_status[2] and 'Full' == job_sge_status[0] ):
                    continue #it's not the right jobNum
            except IndexError :
                continue #it's not the 2nde line

            pipe.stdout.close()
            pipe.wait()
            
            if pipe.returncode == 0 :
                try:
                    return Status( code = sge2mobyleStatus[ status ]  )
                except MobyleError , err :
                    raise MobyleError , "unexpected sge status: " +str( status )
            else:
                raise MobyleError , "cannot get status " + str( pipe.returncode )
        return Status( code = -1 ) #unknown

        
    def kill( self, job_number ):
        """
        @todo: kill the Job
        """
        sge_cmd = "%s %s 1>&2" %( os.path.join( self.sge_prefix , 'qdel' ) ,  job_number )
        os.environ.update( self.sge_env )
        os.system( sge_cmd )