1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
########################################################################################
# #
# Author: Bertrand Neron, #
# Organization:'Biological Software and Databases' Group, Institut Pasteur, Paris. #
# Distributed under GPLv2 Licence. Please refer to the COPYING.LIB document. #
# #
########################################################################################
import sys
import os.path
from logging import getLogger
p_log = getLogger('Mobyle.Policy')
def queue( queueName ):
"""
@return: the name of the queue to be used to execute the job
@rtype: string
"""
return queueName
#if you want to defined a local policy to check user
# you must put you code here
def emailCheck( **args ):
"""
check if the email according to the local rules.
@return:
- Mobyle.Net.EmailAddress.VALID if the email is valid
- Mobyle.Net.EmailAddress.INVALID if the email is rejected
- Mobyle.Net.EmailAddress.CONTINUE to continue futher the email validation process
"""
import Mobyle.Net
return Mobyle.Net.EmailAddress.CONTINUE
#to change the message when a host or email is blocked
# host : the ip is in black list
# invalid : syntax for email
# blackList : the email provide by the user is in black list
# dns : the email domain provide by the user haven't any mail server
def emailUserMessage( method ):
messages = {
'host' : "you are not allowed to run on this server for now",
'syntax' : "you are not allowed to run on this server for now" ,
'blackList' : "you are not allowed to run on this server for now",
'LocalRules' : "you are not allowed to run on this server for now",
'dns' : "you are not allowed to run on this server for now" ,
}
try:
return messages[ method ]
except KeyError:
return "you are not allowed to run on this server for now"
def authenticate( login , passwd ):
"""
Mobyle administartor can put authentification code here. this function must return either
- Mobyle.AuthenticatedSession.AuthenticatedSession.CONTINUE:
the method does not autentified this login/passwd. The fall back method must be applied.
- Mobyle.AuthenticatedSession.AuthenticatedSession.VALID:
the login/password has been authenticated.
- Mobyle.AuthenticatedSession.AuthenticatedSession.REJECT:
the login/password is not allow to continue.
"""
import Mobyle.AuthenticatedSession
return Mobyle.AuthenticatedSession.AuthenticatedSession.CONTINUE
def allow_to_be_executed( job ):
"""
check if the job is allowed to be executed
if a job is not allowed must raise a UserError
@param job: the job to check before to submit it to Execution
@type job: L{Job} instance
@raise UserValueError: if the job is not allowed to be executed
"""
#place here the code you want to be executed to allow a job to be executed.
#this is the last control be fore DRM submission
#if you want to limit simultaneous job according some criteria as IP or user email, ...
#this this the right place to put your code
from Mobyle.MobyleError import UserValueError
return over_limit( job )
def over_limit (job):
"""
check if the user (same email) has a similar job (same command line or same workflow name) running
@param job: the job to check before to submit it to Execution
@type job: L{Job} instance
@raise UserValueError: if the number of similar jobs exceed the Config.SIMULTANEOUS_JOBS
"""
from hashlib import md5
import glob
from Mobyle.Admin import Admin
from Mobyle.MobyleError import UserValueError
newMd5 = md5()
newMd5.update( str( job.getEmail() ) )
try:
remote = os.environ[ 'REMOTE_HOST' ]
if not remote :
try:
remote = os.environ[ 'REMOTE_ADDR' ]
except KeyError :
remote = 'no web'
except KeyError:
try:
remote = os.environ[ 'REMOTE_ADDR' ]
except KeyError:
remote = 'no web'
newMd5.update( remote )
newMd5.update( job.getCommandLine() )
newDigest = newMd5.hexdigest()
work_dir = job.getDir()
thisJobAdm = Admin( work_dir )
thisJobAdm.setMd5( newDigest )
thisJobAdm.commit()
mask = os.path.normpath( "%s/%s.*" %(
job.cfg.admindir() ,
job.getServiceName()
)
)
jobs = glob.glob( mask )
max_jobs = job.cfg.simultaneous_jobs()
if max_jobs == 0 :
return
nb_of_jobs = 0
msg = None
for one_job in jobs:
try:
oldAdm = Admin( one_job )
except MobyleError, err :
if os.path.lexists( one_job ):
p_log.critical( "%s/%s: invalid job in ADMINDIR : %s" %( job.getServiceName() , job.getKey() , err ) )
continue
if( oldAdm.getWorkflowID() ):
#we allow a workflow to run several identical job in parallel
continue
oldDigest = oldAdm.getMd5()
if newDigest == oldDigest :
oldStatus = job.getStatus()
if not oldStatus.isEnded() :
nb_of_jobs += 1
if nb_of_jobs >= max_jobs:
msg = "%d similar jobs (%s) have been already submitted (md5 = %s)" % (
nb_of_jobs ,
os.path.basename( one_job ),
newDigest
)
userMsg = " %d similar job(s) have been already submitted, and are(is) not finished yet. Please wait for the end of these jobs before you resubmit." % ( nb_of_jobs )
p_log.warning( msg + " : run aborted " )
raise UserValueError( parameter = None, msg = userMsg )
return True
|