1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
|
#! /usr/bin/env python
#############################################################
# #
# Author: Sandrine Larroude ,Bertrand Neron #
# Organization:'Biological Software and Databases' Group, #
# Institut Pasteur, Paris. #
# Distributed under GPLv2 Licence. Please refer to the #
# COPYING.LIB document. #
# #
#############################################################
import os , sys
MOBYLEHOME = None
if os.environ.has_key('MOBYLEHOME'):
MOBYLEHOME = os.environ['MOBYLEHOME']
if not MOBYLEHOME:
sys.exit( 'MOBYLEHOME must be defined in your environment' )
if ( os.path.join( MOBYLEHOME , 'Src' ) ) not in sys.path:
sys.path.append( os.path.join( MOBYLEHOME , 'Src' ) )
from time import time
from shutil import rmtree
from Mobyle.ConfigManager import Config
config = Config()
from Mobyle.Session import Session
from Mobyle.MobyleError import MobyleError, SessionError ,URLError, HTTPError, JobError
from Mobyle.Admin import Admin
from Mobyle.Utils import isExecuting
from Mobyle.JobState import JobState
from Mobyle.StatusManager import StatusManager
def day2second( day ):
"""
@param day: the number of day to convert in second
@type day: integer or float
@return: convert a number of days in second
@rtype: integer
"""
return int( day * 24 * 60 * 60 )
def clean_jobs( config , start_time , delay , logger , dry_run = False ):
"""
remove job directory if job is finished and older than the delay.
@param config: the Mobyle Configuration
@type config: L{Config} instance
@param start_time: a time in seconds which represent the date of the beginig of cleaning since Epoch.
@type start_time: foat
@param delay: the delay in days to remove sessons older than start_time + delay
@type delay: float
@param logger: the logger to log informations
@type logger: logging.logger instance
@param dry_run:
@type dry_run: boolean
"""
jobs_repository = config.results_path()
if not os.path.isdir( jobs_repository ):
logger.critical( "Check your Mobyle configuration file, the jobs repository :'%s' does not exist")
sys.exit(1)
try:
service_names = os.listdir( jobs_repository )
service_names.remove( 'ADMINDIR' )
except Exception, err:
logger.critical( "The jobs directory is not accessible: %s" %err , exc_info = True )
sys.exit(1)
for service_name in service_names:
logger.info( "-------- cleanning %s --------" % service_name)
service_path = os.path.join( jobs_repository , service_name )
if not os.path.isdir( service_path ):
logger.info( "ignore the file %s" %service_path )
continue
try:
jobs_keys = os.listdir( service_path )
except Exception, err:
logger.error( "The jobs directory for service %s is not accessible: %s" %( service_name , err ) )
continue
for job_key in jobs_keys:
logger.info( "cleaning job %s" %job_key )
job_path = os.path.join( service_path , job_key )
if not os.path.isdir( job_path ):
logger.debug( "%s is not a directory." %job_path )
continue
try:
admin = Admin( job_path )
job_state = JobState( job_path )
if job_state.isWorkflow():
clean_workflow_job( jobs_repository , service_name , job_key , job_path , start_time , delay , admin , job_state , logger , dry_run = dry_run )
else:
clean_program_job( jobs_repository , service_name , job_key , job_path , start_time , delay , admin , job_state , logger , dry_run = dry_run )
del job_state._refs[ job_path ]
except MobyleError , err:
logger.error( "cannot remove the job : %s : %s" %( job_path , err ) )
def clean_program_job( jobs_repository , service_name , job_key , job_path ,
start_time , delay , admin , job_state , logger , dry_run = False):
delay_sec = day2second( delay )
sm = StatusManager()
job_status = sm.getStatus( job_path )
if not job_status.isKnown():
logger.warning( "Unkown status for job %s"% job_path )
return
is_in_admindir = os.access( os.path.join( jobs_repository, 'ADMINDIR', "%s.%s" %( service_name , job_key) )
, os.F_OK)
last_modification_time = os.path.getmtime( job_path )
old_job = int( start_time - last_modification_time ) > delay_sec
if job_status.isEnded() and old_job :
logger.debug( "the job is ended and old ( %d > %d )"%( int( start_time - last_modification_time ) , delay_sec) )
if is_in_admindir:
logger.error("the job %s has the status %s and is still in ADMINDIR." %( job_path , job_status ) )
else:
workflowID = admin.getWorkflowID()
logger.debug( "workflowID = %s "%workflowID )
if workflowID:
logger.debug( "the job %s belongs to the workflow %s" %(job_path , workflowID ))
try:
wf_state = JobState( uri = workflowID )
except URLError , err :
#the portal is down?
logger.warning( "The job %s belongs to the workflow %s and the portal does respond (%s). The job is not removed."% (job_path ,
workflowID ,
err))
except HTTPError , err:
#the job does not exists anymore
if err.code == 404:
logger.info( "The job %s belongs to the workflow %s which is not exist any more (%s). The job is removed." % (job_path ,
workflowID ,
err))
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove job %s : %s ." %(job_path , err ) )
else:
logger.info( "The job %s belongs to the workflow %s which is not reachable (%s). The job is not removed." % (job_path ,
workflowID ,
err))
except JobError , err:
from errno import ENOENT
if err.errno == ENOENT:
logger.debug( "The job %s belongs to the workflow %s. which is not exist any more. The job is removed."% (job_path ,
workflowID
))
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove job %s : %s ." %(job_path , err ) )
else:
logger.error( "the workflow job %s cannot be loaded: %s : the job %s is not removed."%( workflowID ,
err ,
job_path
) )
except Exception ,err:
logger.error( "an error occured during %s workflow job loading: %s : the job %s is not removed."%( workflowID ,
err ,
job_path
) )
else:
workflow_status = sm.getStatus( wf_state.getDir() )
logger.debug( "the job %s belongs to the workflow %s which has %s status. The job is not removed."%( job_path ,
workflowID ,
workflow_status))
else:
logger.info( "remove job %s" % job_path )
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove job %s : %s ." %(job_path , err ) )
elif job_status.isQueryable() and old_job:
if not is_in_admindir:
logger.error("The job %s has the status %s since more than %d days and is not anymore in ADMINDIR." % ( job_path, job_status, delay))
else:
try:
if not isExecuting( job_path ):
logger.error( "The job %s has the status %s even if it is not executing." % ( job_path, job_status ) )
except MobyleError ,err:
logger.error( "Probblem during quering the satus of the job %s: %s" %( job_path ,err ) )
elif old_job :#not ended not queryable, building??
logger.error( "The job %s has the %s status since more than the delay" % (job_path, job_status) )
else:
logger.info( "The job %s is too young to be cleaned" %job_path )
def clean_workflow_job(jobs_repository , service_name , job_key , job_path ,
start_time , delay , admin , job_state , logger , dry_run = False):
delay_sec = day2second( delay )
sm = StatusManager()
job_status = sm.getStatus( job_path )
if not job_status.isKnown():
logger.warning( "Unkown status for job %s"% job_path )
return
last_modification_time = os.path.getmtime( job_path )
old_job = int( start_time - last_modification_time ) > delay_sec
if job_status.isEnded() and old_job :
logger.debug( "the workflow is ended and old ( %d > %d )"%( int( start_time - last_modification_time ) , delay_sec) )
workflowID = admin.getWorkflowID()
logger.debug( "workflowID = %s "%workflowID )
if workflowID:#this workflow is a subtask of an other workflow
logger.debug( "the workflow %s belongs to the workflow %s" %(job_path , workflowID ))
try:
wf_state = JobState( uri = workflowID )
except URLError , err :
#the portal is down?
logger.warning( "The workflow %s belongs to the workflow %s and the portal does respond (%s). The workflow is not removed."% (job_path ,
workflowID ,
err))
except HTTPError , err:
#the workflow does not exists anymore
if err.code == 404:
logger.info( "The workflow %s belongs to the workflow %s which is not exist any more (%s). The workflow is removed." % (job_path ,
workflowID ,
err))
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove workflow %s : %s ." %(job_path , err ) )
else:
logger.info( "The workflow %s belongs to the workflow %s which is not reachable (%s). The workflow is not removed." % (job_path ,
workflowID ,
err))
except JobError , err:
from errno import ENOENT
if err.errno == ENOENT:
logger.debug( "The job %s belongs to the workflow %s. which is not exist any more. The job is removed."% (job_path ,
workflowID
))
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove job %s : %s ." %( job_path , err ) )
else:
logger.error( "the workflow job %s cannot be loaded: %s : the workflow %s is not removed."%( workflowID ,
err ,
job_path
) )
except Exception ,err:
logger.error( "an error occured during %s workflow job loading: %s : the job %s is not removed."%( workflowID ,
err ,
job_path
) )
else:
workflow_status = sm.getStatus( wf_state.getDir() )
logger.debug( "the workflow %s belongs to the workflow %s which has %s status. The workflow is not removed."%( job_path ,
workflowID ,
workflow_status))
else: #this is a "top level" workflow
logger.info( "remove workflow %s" % job_path )
if not dry_run:
try:
rmtree( job_path )
except Exception, err:
logger.error( "cannot remove workflow %s : %s ." %( job_path , err ) )
elif old_job :
logger.info( "The workflow %s has the %s status and is older than %d days" % (job_path, job_status , delay) )
else:
logger.info( "The workflow %s is too young to be cleaned" %job_path )
def clean_sessions( config , start_time , delay , logger , dry_run = False ):
"""
remove annonymous sessions if the sessions does not point toward any jobs
@param config: the Mobyle Configuration
@type config: L{Config} instance
@param start_time: a time in seconds which represent the date of the beginig of cleaning since Epoch.
@type start_time: foat
@param delay: the delay in days to remove sessons older than start_time + delay
@type delay: float
@param logger: the logger to log informations
@type logger: logging.logger instance
@param dry_run:
@type dry_run: boolean
"""
delay = day2second( delay )
sessions_repository = os.path.join( config.user_sessions_path() , 'anonymous' )
if not os.path.isdir( sessions_repository ):
logger.critical( "Check your Mobyle configuration file, the annonymous sessions directory:'%s' does not exist" %sessions_repository , exc_info= True)
sys.exit(1)
try:
sessions_keys = os.listdir( sessions_repository )
except Exception, err:
logger.critical( "The anonymous sessions directory is not accessible: %s" %err , exc_info= True )
sys.exit(1)
for session_key in sessions_keys :
try:
session_path = os.path.join( sessions_repository , session_key )
if not os.path.isdir( session_path ):
logger.debug( "%s is not a directory." %session_path)
continue
last_modification_time = os.path.getmtime( session_path )
session = Session( session_path , session_key , config )
try:
jobs = session.getAllJobs()
if not jobs and int( start_time - last_modification_time ) > delay :
logger.info("removing session %s" %session_path)
if not dry_run:
try:
rmtree( session_path )
except Exception , err:
logger.error( "cannot remove the session %s : %s" %( session_path , err ))
continue
except SessionError, err:
if not os.access( os.path.join( session_path,'.session.xml'), os.F_OK):
logger.warning( "no .session.xml in the session %s , remove it anyway" % session_path )
try:
rmtree( session_path )
except Exception , err:
logger.error( "cannot remove the session %s : %s" %( session_path , err ))
continue
else:
logger.error( "Error during session %s loading: %s" % ( session_path , err ) )
continue
except MobyleError, me:
logger.error( "Error during session %s loading: %s" % ( session_path , me ) )
continue
if __name__ == "__main__":
import atexit
now = time()
from optparse import OptionParser
parser = OptionParser( )
parser.add_option( "-j" , "--jobs",
action = "store_true",
dest = "jobs",
default = False ,
help = "Clean jobs (programs and workflows).")
parser.add_option( "-s" , "--sessions",
action = "store_true",
dest = "sessions",
default = False ,
help = "Clean anonymous sessions.")
parser.add_option( "-d" , "--delay",
action = "store",
type = 'int',
dest = "delay",
default = config.remainResults() ,
help = "Delete jobs/sessions older than <DELAY> days ( positive integer value ).")
parser.add_option( "-l" , "--log",
action = "store",
type = 'string',
dest = "log_file",
help = "Path to the Logfile where put the logs.")
parser.add_option( "-n" , "--dry-run",
action = "store_true",
dest = "dry_run",
help = "don't actually do anything.")
parser.add_option( "-v" , "--verbose",
action= "count",
dest = "verbosity",
default = 0 ,
help = "increase the verbosity level. There is 4 levels: Error messages (default), Warning (-v), Info (-vv) and Debug.(-vvv)")
parser.add_option( "-q" , "--quiet" ,
action= "store_true" ,
dest = "quiet" ,
default = False ,
help = "disable messages on standard error")
options, args = parser.parse_args()
if not options.jobs and not options.sessions :
options.jobs = True
options.sessions =True
import logging
cleaner_handlers = []
if options.quiet :
cleaner_handlers.append( logging.FileHandler( '/dev/null' , 'a' ) )
if options.log_file :
try:
cleaner_handlers.append( logging.FileHandler( options.log_file , 'a' ) )
except(IOError , OSError) , err:
print >> sys.stderr , "cannot log messages in %s: %s" % ( options.log_file , err )
sys.exit(1)
else:
cleaner_handlers.append( logging.StreamHandler( sys.stderr ) )
if options.verbosity == 0:
for h in cleaner_handlers: h.setLevel( logging.ERROR )
elif options.verbosity == 1:
for h in cleaner_handlers: h.setLevel( logging.WARNING )
elif options.verbosity == 2:
for h in cleaner_handlers: h.setLevel( logging.INFO )
elif options.verbosity == 3:
for h in cleaner_handlers: h.setLevel( logging.DEBUG )
if options.verbosity < 3:
cleaner_formatter = logging.Formatter( '%(filename)-10s : %(levelname)-8s : %(asctime)s : %(message)s', '%a, %d %b %Y %H:%M:%S' )
else:
cleaner_formatter = logging.Formatter( '%(filename)-10s : %(levelname)-8s : L %(lineno)d : %(asctime)s : %(message)s', '%a, %d %b %Y %H:%M:%S' )
for h in cleaner_handlers:
h.setFormatter( cleaner_formatter )
logger = logging.getLogger( 'cleaner' )
for h in cleaner_handlers:
logger.addHandler( h )
mail_handler = logging.handlers.SMTPHandler( config.mailhost(),
config.sender() ,
config.maintainer() ,
'[ %s ] Mobyle cleaner problem' % config.root_url()
)
mail_handler.setLevel( logging.CRITICAL )
mail_handler.setFormatter( logging.Formatter( '%(filename)-10s : %(levelname)-8s : L %(lineno)d : %(asctime)s : %(message)s', '%a, %d %b %Y %H:%M:%S' ) )
logger.addHandler( mail_handler )
lock_file = os.path.join( config.results_path() , '.mobclean_lock' )
def remove_lock():
if os.path.exists( lock_file ):
try:
os.unlink( lock_file )
except Exception, err:
logger.critical( "cannot remove lock %s" %lock_file )
if os.path.exists( lock_file ):
try:
f= file( lock_file , 'r')
pid = f.readline()
f.close()
except Exception, err:
pid = "UNKNOWN"
logger.critical( "a mobclean is already running (pid = %s), abort this one "%pid )
sys.exit(1)
else:
try:
f= file( lock_file , 'w')
f.write( str( os.getpid() ) )
f.close()
except Exception , err:
logger.critical( "cannot put a lock file, abort mobclean: %s" %err )
sys.exit(1)
atexit.register( remove_lock )
if options.jobs :
clean_jobs( config , now , options.delay , logger, dry_run = options.dry_run )
if options.sessions :
clean_sessions( config , now , options.delay , logger, dry_run = options.dry_run )
|