File: Utils.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (485 lines) | stat: -rw-r--r-- 19,011 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
########################################################################################
#                                                                                      #
#   Author: Bertrand Neron,                                                            #
#   Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris.   #  
#   Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document.         #
#                                                                                      #
########################################################################################


#import os 
#from time import localtime, strftime , strptime
from time import strftime

from logging import getLogger
u_log = getLogger( __name__ )

from Mobyle.MobyleError import MobyleError , UserValueError
from Mobyle.Admin import Admin
#from Mobyle.Status import Status

def executionLoader( jobID = None , alias = None , execution_config=None):
    assert ( bool( jobID ) +  bool( alias ) + bool( execution_config )== 1), "please provide either a jobID, an alias or an execution_config"
    from Mobyle.ConfigManager import Config
    cfg = Config()
    if not execution_config:
        if jobID:
            from Mobyle.JobState import normUri
            from urlparse import urlparse
            path = normUri( jobID )
            protocol, host, path, a, b, c = urlparse( path )
            if protocol == "http":
                raise  NotImplementedError , "trying to instanciate an Execution system from a remote Job"
            if path[-9:] == "index.xml":
                path = path[:-10 ]
            adm = Admin( path )
            alias = adm.getExecutionAlias()
        if not alias:
            msg = "cant determine the Execution system for %s " % ( jobID ) 
            u_log.error( msg )
            raise MobyleError( msg )
        try:
            execution_config = cfg.getExecutionConfigFromAlias( alias )
        except KeyError:
            msg = "the ExecutionConfig alias %s doesn't match with any alias in Config" % alias
            u_log.critical( msg )
            raise MobyleError( msg )     
    klass_name = execution_config.execution_class_name
    try:
        module = __import__( 'Mobyle.Execution.%s' % klass_name )
    except ImportError , err:
        msg = "The Execution.%s module is missing" % klass_name
        u_log.critical( msg )
        raise MobyleError, msg
    except Exception , err:
        msg = "an error occurred during the Execution.%s import: %s" % ( klass_name , err )
        u_log.critical( msg )
        raise MobyleError, msg
    try:
        klass = module.Execution.__dict__[ klass_name ].__dict__[ klass_name ]
        return klass( execution_config )
    except KeyError , err :
        msg = "The Execution class %s does not exist" % klass_name
        u_log.critical( msg )
        raise MobyleError, msg
    except Exception , err:
        msg = "an error occurred during the class %s loading : %s" % ( klass_name, err )
        u_log.critical( msg )
        raise MobyleError, msg
    
    
def getStatus( jobID ):
    """
    @param jobID: the url of the job
    @type jobID: string
    @return: the current status of the job
    @rtype: string
    @raise MobyleError: if the job has no number or if the job doesn't exist anymore
    @raise OSError: if the user is not the owner of the process
    """
    from Mobyle.JobState import JobState , normUri
    from urlparse import urlparse
    from Mobyle.StatusManager import StatusManager 
    
    path = normUri( jobID )
    protocol, host, path, a, b, c = urlparse( path )
    if protocol == "http":
        raise  NotImplementedError , "trying to querying a distant server"
    
    if path[-9:] == "index.xml":
        path = path[:-10 ]
    sm =StatusManager()
    
    oldStatus = sm.getStatus( path )
    #'killed' , 'finished' , 'error' the status cannot change anymore
    #'building' these jobs have not yet batch number

    #  ( 'finished' , 'error' , 'killed' , 'building' ):
    if not oldStatus.isQueryable():
        return oldStatus
    else: 
        adm = Admin( path )
        batch = adm.getExecutionAlias()
        jobNum = adm.getNumber()
        
        if batch is None or jobNum is None:
            return oldStatus
        try:
            exec_engine = executionLoader( jobID = jobID )
            newStatus = exec_engine.getStatus( jobNum )
        except Exception , err : 
            ###u_log.error( str( err ) , exc_info = True )
            #with sge drmaa when sge_master does not respond
            #get status is waited forever in sge
            #unless we exit explicitly the drmaa session
            #in this case a drmaa error is raise so I must catch all Exceptions
            #and return oldStatus as if drmaa return unknow status 
            return oldStatus
        if not newStatus.isKnown():
            return oldStatus
        if newStatus != oldStatus :
            sm.setStatus( path , newStatus )
        return newStatus 

def isExecuting( jobID ):
    """
    @param jobID: the url of the job
    @type jobID: string
    @return True if the job is currently executing ( submitted , running , pending , hold ).
    False otherwise ( building, finished , error , killed )
    @rtype: boolean
    @raise MobyleError: if the job has no number 
    @raise OSError: if the user is not the owner of the process
    """
    from Mobyle.JobState import normUri
    from urlparse import urlparse 
    from Mobyle.StatusManager import StatusManager
    
    path = normUri( jobID )
    protocol, host, path, a, b, c = urlparse( path )
    if protocol == "http":
        raise  NotImplementedError , "trying to querying a distant server"
    
    if path[-9:] == "index.xml":
        path = path[:-10 ]
    adm = Admin( path )
    batch = adm.getExecutionAlias()
    jobNum = adm.getNumber()
    
    if batch is None or jobNum is None:
        sm = StatusManager()
        status = sm.getStatus( path )
        if not status.isQueryable():
            return False
        else:
            raise MobyleError( "inconsistency in .admin file %s" % path )
    try:
        execKlass = executionLoader( jobID = jobID )
        newStatus = execKlass.getStatus( jobNum )
    except MobyleError , err : 
        u_log.error( str( err ) , exc_info = True )
        raise err
    return newStatus.isQueryable()





def killJob( jobID ):
    """
    @param jobID: the url of the job or a sequence of jobID
    @type jobID: string or sequence of jobID
    @return: 
    @rtype: 
    @raise MobyleError: if the job has no number or if the job doesn't exist anymore
    @todo: tester la partie sge
    """
    from types  import StringTypes , TupleType , ListType
    from Mobyle.MobyleError import MobyleError
    from Mobyle.JobState import JobState , normUri
    from Mobyle.Job import Job
    from Mobyle.WorkflowJob import WorkflowJob
    
    if isinstance( jobID , StringTypes ) :
        jobIDs = [ jobID ]
    elif isinstance( jobID , ( ListType , TupleType ) ) :
        jobIDs = jobID
    else:
        raise MobyleError , "jobID must be a string or a Sequence of strings :%s"%type( jobID )
    
    errors = []
    for jobID in jobIDs :
        try:
            path = normUri( jobID )
        except MobyleError , err :
            errors.append( ( jobID , str( err ) ) )
            continue
        if path[:4] == 'http' :
            #the jobID is not on this Mobyle server
            errors.append( ( jobID , "can't kill distant job" ) )
            continue
        js = JobState(uri = jobID )
        if js.isWorkflow():
            job = WorkflowJob( id= jobID )
        else:
            job= Job( ID= jobID )
        try:
            job.kill()
        except MobyleError , err :
            errors.append( ( jobID , str( err ) ) )
            continue
    if errors:
        msg = ''
        for jobID , msgErr in errors :
            msg = "%s killJob( %s ) - %s\n" % ( msg , jobID , msgErr )
            
        raise MobyleError , msg

def safeFileName( fileName ):
    import string , re
    if fileName in ( 'index.xml' , '.admin' , '.command' ,'.forChild.dump' ,'.session.xml'):
        raise UserValueError( msg = "value \"" + str( fileName ) + "\" is not allowed" )
    
    for car in fileName :
        if car not in string.printable : #we don't allow  non ascii char
            fileName = fileName.replace( car , '_')
    #SECURITY: substitution of shell special characters
    fileName = re.sub( "[ ~%#\"\'<>&\*;$`\|()\[\]\{\}\?\s ]" , '_' , fileName )
    #SECURITY: transform an absolute path in relative path
    fileName = re.sub( "^.*[\\\]", "" , fileName )
    fileName = re.sub( "^/", "" ,  fileName  )
    fileName = re.sub( "\.(\.)+", "" ,  fileName  )
    return fileName



def makeService( programUrl ):
    import Mobyle.Parser
    try:
        service = Mobyle.Parser.parseService( programUrl )
        return service
    except IOError , err:
        raise MobyleError , str( err )

      
def sizeFormat(bytes, precision=2):
    """Returns a humanized string for a given amount of bytes"""
    import math
    bytes = int(bytes)
    if bytes is 0:
        return '0 bytes'
    log = math.floor( math.log( bytes , 1024 ) )
    return "%.*f%s" % ( precision , bytes / math.pow(1024, log), [ 'bytes', 'KiB', 'MiB' , 'GiB' ][ int(log) ] )


def zipFiles(zip_filename, files):
    """
    @param zip_filename: the absolute path to the archive to create
    @type zip_filename: string
    @param files: a list of tuple each tuple contains 2 elements the absolute path of the file to archive , and the name of this file in the archive
    @type files: [ ( string abs_path_file_to archive , string arc_name ) , ... ]
    @return: the abspath of the archive
    @rtype: string
    """
    import zipfile
    import os
    from time import localtime
    from Mobyle.StatusManager import StatusManager
    
    def compression_method(size):
        if size > 0 and size < 10:
            method = zipfile.ZIP_STORED
        elif size >= 10:
            method = zipfile.ZIP_DEFLATED
        else:
            #the file is empty we don't add it to this archive
            method = None
        return method

    def expand_folder(folder):
        files_2_add = []
        folder_path, _ = folder
        for dirpath, dirnames, filenames in os.walk(folder_path):
            for f in filenames:
                local_dir_path = dirpath[dirpath.find(os.path.basename(folder_path)):]
                arc_filename = os.path.join(local_dir_path, f)
                abs_path = os.path.abspath(os.path.join(dirpath, f))
                files_2_add.append((abs_path, arc_filename))
        return files_2_add


    folder_2_expand = filter(lambda x : os.path.isdir(x[0]), files)
    for folder in folder_2_expand:
        files.extend(expand_folder(folder))
    
    myZipFile = zipfile.ZipFile( zip_filename, "w", allowZip64 = True )
    for filename, arc_filename in files:
        if arc_filename == 'index.xml':
            from lxml import etree
            index_tree = etree.parse( filename )
            status_tree = etree.parse( os.path.join( os.path.dirname(filename), StatusManager.file_name ) )
            root_index_tree = index_tree.getroot()
            pi = root_index_tree.getprevious()
            pi.set( "href" , "job.xsl")
            root_index_tree.append( status_tree.getroot() )
            indent( root_index_tree )
            index = etree.tostring( index_tree , xml_declaration=True , encoding='UTF-8' )
            myZipInfo = zipfile.ZipInfo( arc_filename , localtime()[:6] )
            myZipInfo.external_attr = 2175008768   # set perms to 644
            myZipFile.writestr(  myZipInfo  , index )
            continue
        try:
            size = os.path.getsize( filename )
        except OSError , err:
            u_log.critical( "error during zipping files: %s"%(err) , exc_info = True)
            continue

        method = compression_method(size)
        if method is not None:
            myZipFile.write(filename, arc_filename, method)
    myZipFile.close()
    return zip_filename

def emailHelpRequest( cfg, userEmail, registry, job_id, message, session, error_parameter, error_message ):
    from Mobyle.Net import Email, EmailAddress 
    if(message is None):
        raise UserValueError(msg='please provide a request message.')
    userAddress = EmailAddress(userEmail)            
    if job_id:
        jobServer = registry.getServerByJobId(job_id)
    else:
        jobServer = registry.serversByName['local']
    if cfg.mailHelp()!=jobServer.help:
        # generate a list of unique email addresses from the execution server and portal server help addresses
        help_addresses = cfg.mailHelp() + list(set(cfg.mailHelp())-set(jobServer.help))
        helpAddress = EmailAddress(help_addresses)
    else:
        helpAddress = EmailAddress(cfg.mailHelp())
    helpEmail = Email(helpAddress)
    job_status = "[not provided]"
    job_date = "[not provided]"
    execution_alias = "[not provided]"
    number = "[not provided]"
    queue = "[not provided]"
    if job_id is not None:
        job_info = session.getJob(job_id)
        job_status = job_info['status']
        job_date = strftime("%a, %d %b %Y %H:%M:%S +0000", job_info['date'])
        try:
            from Mobyle.JobState import url2path
            job_path = url2path(job_id)
            adm = Admin(job_path)
            execution_alias = adm.getExecutionAlias()
            number = adm.getNumber()
            queue = adm.getQueue()
        except MobyleError:
            pass
    else:
        job_id = "[not provided]"
    msgDict = {
                 'USER': userAddress,
                 'SENDER': cfg.sender(),
                 'MSG': message,
                 'SESSION_ID': session.getKey(),
                 'SESSION_EMAIL': session.getEmail(),
                 'SESSION_ACTIVATED': session.isActivated(),
                 'SESSION_AUTHENTICATED': session.isAuthenticated(),
                 'JOB_URL': job_id,
                 'JOB_DATE': job_date,
                 'JOB_STATUS': job_status,
                 'JOB_ERROR_PARAM': error_parameter,   
                 'JOB_ERROR_MSG': error_message,
                 'HELP': str(helpAddress),
                 'EXECUTION_ALIAS': execution_alias,
                 'NUMBER': number,
                 'QUEUE': queue
              } 
    helpEmail.send('HELP_REQUEST' , msgDict)
    receiptEmail = Email( userAddress )
    receiptEmail.send('HELP_REQUEST_RECEIPT' , msgDict)
    return helpEmail.getBody()

def emailResults( cfg , userEmail, registry, ID, job_path, serviceName, jobKey, FileName = None ):
    """
    @param cfg: the configuration of Mobyle    
    @type cfg: Config instance
    @param userEmail: the user email address
    @type userEmail: EmailAddress instance
    @param registry: the registry of deployed services
    @type registry: Registry.registry object
    @param ID: the ID of the job
    @type ID: string
    @param job_path: the absolute path to the job 
    @type job_path: string
    @param serviceName: the name of the service
    @type serviceName: string
    @param jobKey: the key of the job
    @type jobKey: string
    @param FileName: the absolute path of zip file to attach to the email
    @type FileName: string or None
    """
    from Mobyle.Net import Email 
    from Mobyle.MobyleError import EmailError , TooBigError
    import os
    dont_email_result , maxmailsize =  cfg.mailResults()
    if dont_email_result :
        return
    else:
        if userEmail :
            mail = Email( userEmail )
            jobInPortalUrl = "%s/portal.py#jobs::%s" %( cfg.cgi_url() ,
                                                        registry.getJobPID( ID ),
                                                       )
            
            if FileName is not None:
                zipSize = os.path.getsize( FileName )
                mailDict = { 'SENDER'         : cfg.sender() ,
                             'HELP'           : ", ".join( cfg.mailHelp() ) ,
                             'SERVER_NAME'    : cfg.portal_url() ,
                             'JOB_URL'        : jobInPortalUrl , 
                             'RESULTS_REMAIN' : cfg.remainResults() ,
                             'JOB_NAME'       : serviceName ,
                             'JOB_KEY'        : jobKey ,
                             }
                if zipSize > maxmailsize - 2048 :
                    #2048 octet is an estimated size of email headers
                    try:
                        mail.send( 'RESULTS_TOOBIG' , mailDict )
                        return
                    except EmailError ,err :
                        msg = str(err)
                        adm = Admin( job_path )
                        adm.setMessage( msg )
                        adm.commit()
                        u_log.error( "%s/%s : %s" %( serviceName ,
                                                      jobKey ,
                                                      msg
                                                      )
                        )
                        return
                else:
                    try:   
                        mail.send( 'RESULTS_FILES' , mailDict , files = [ FileName ]  )
                        return
                    except TooBigError ,err :
                        try:
                            mail.send( 'RESULTS_TOOBIG' , mailDict )
                        except EmailError ,err :
                            msg = str(err)
                            adm = Admin( job_path )
                            adm.setMessage( msg )
                            adm.commit()
                            u_log.error( "%s/%s : %s" %( serviceName ,
                                                          jobKey ,
                                                          msg
                                                          )
                            )
                        
                        return
            else: #if there is a problem on zip creation
                mail.send( 'RESULTS_NOTIFICATION' , mailDict )
        else:
            return

    

def indent(elem, level=0):
    """
    Due to malfunction in pretty_print argument of etree.tostring
    we use this function found here
    http://stackoverflow.com/questions/1238988/changing-the-default-indentation-of-etree-tostring-in-lxml
    to have a correct indentation 
    """
    i = "\n" + level*"  "
    if len(elem):
        if not elem.text or not elem.text.strip():
            elem.text = i + "  "
        if not elem.tail or not elem.tail.strip():
            elem.tail = i
        for elem in elem:
            indent(elem, level+1)
        if not elem.tail or not elem.tail.strip():
            elem.tail = i
    else:
        if level and (not elem.tail or not elem.tail.strip()):
            elem.tail = i