File: Policy.py

package info (click to toggle)
mobyle 1.0.6~dfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 8,036 kB
  • sloc: python: 19,496; sh: 54; makefile: 30; xml: 6; ansic: 5
file content (141 lines) | stat: -rw-r--r-- 5,028 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sys
import os.path


from logging  import getLogger
p_log = getLogger('Mobyle.Policy')



def queue( queueName ):
    """
    @return: the name of the queue to be used to execute the job
    @rtype: string
    """
    return queueName



#if you want to defined a local policy to check user
# you must put you code here 
def emailCheck( **args ):
     """
     check if the email according to the local rules.
     @return:
      - Mobyle.Net.EmailAddress.VALID    if the email is valid
      - Mobyle.Net.EmailAddress.INVALID  if the email is rejected
      - Mobyle.Net.EmailAddress.CONTINUE to continue futher the email validation process
     """
     import Mobyle.Net

     return Mobyle.Net.EmailAddress.CONTINUE
 


def authenticate( login , passwd ):
    """
    Mobyle administartor can put authentification code here. this function must return either 
     - Mobyle.AuthenticatedSession.AuthenticatedSession.CONTINUE: 
                the method does not autentified this login/passwd. The fall back method must be applied.
     - Mobyle.AuthenticatedSession.AuthenticatedSession.VALID:    
               the login/password has been authenticated.
     - Mobyle.AuthenticatedSession.AuthenticatedSession.REJECT:    
              the login/password is not allow to continue.
    """
    import Mobyle.AuthenticatedSession
    
    return Mobyle.AuthenticatedSession.AuthenticatedSession.CONTINUE


def allow_to_be_executed( job ):
    """
    check if the job is allowed to be executed 
    if a job is not allowed must raise a UserError
    @param job: the job to check before to submit it to Execution
    @type job: L{Job} instance  
    @raise UserValueError: if the job is not allowed to be executed
    """
    #place here the code you want to be executed to allow a job to be executed.
    #this is the last control be fore DRM submission
    #if you want to limit simultaneous job according some criteria as IP or user email, ...
    #this this the right place to put your code 
    from Mobyle.MobyleError import UserValueError
    return over_limit( job )
    
    
    
def over_limit (job):
    """
    check if the user (same email) has a similar job (same command line or same workflow name) running 
    @param job: the job to check before to submit it to Execution
    @type job: L{Job} instance  
    @raise UserValueError: if the number of similar jobs exceed the Config.SIMULTANEOUS_JOBS
    """
    from hashlib import md5
    import glob
    from Mobyle.Admin import Admin
    from Mobyle.MobyleError import UserValueError
    
    newMd5 = md5()
    newMd5.update( str( job.getEmail() ) )
    try:
        remote = os.environ[ 'REMOTE_HOST' ]
        if not remote :
            try:
                remote = os.environ[ 'REMOTE_ADDR' ]
            except KeyError :
                remote = 'no web'
    except KeyError:
        try:
            remote = os.environ[ 'REMOTE_ADDR' ]
        except KeyError:
            remote = 'no web'
                
    newMd5.update( remote )
    newMd5.update( job.getCommandLine() )
    newDigest = newMd5.hexdigest()
    
    work_dir = job.getDir()
        
    thisJobAdm = Admin( work_dir )
    thisJobAdm.setMd5( newDigest )
    thisJobAdm.commit()

    mask = os.path.normpath( "%s/%s.*" %(
        job.cfg.admindir() ,
        job.getServiceName()
        )
                                 )
    jobs = glob.glob( mask )
    max_jobs = job.cfg.simultaneous_jobs()
        
    if max_jobs == 0 :
        return 
    nb_of_jobs = 0
    msg = None
    for one_job in jobs:
        try:
            oldAdm = Admin( one_job )
        except MobyleError, err :
            if os.path.lexists( one_job ):
                p_log.critical( "%s/%s: invalid job in ADMINDIR : %s" %( job.getServiceName() ,  job.getKey() , err ) )
            continue
        if( oldAdm.getWorkflowID() ):
            #we allow a workflow to run several identical job in parallel
            continue
            
        oldDigest = oldAdm.getMd5()
        if newDigest == oldDigest :
            oldStatus = job.getStatus() 
            if not oldStatus.isEnded() :
                nb_of_jobs += 1
                if nb_of_jobs >= max_jobs:
                    msg = "%d similar jobs (%s) have been already submitted (md5 = %s)" % (
                                                                                    nb_of_jobs ,
                                                                                    os.path.basename( one_job ),
                                                                                    newDigest
                                                                                    )
                    userMsg = " %d similar job(s) have been already submitted, and are(is) not finished yet. Please wait for the end of these jobs before you resubmit." % ( nb_of_jobs )
                    p_log.warning( msg + " : run aborted " )
                    raise UserValueError( parameter = None, msg = userMsg )
    return True