1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
from datetime import datetime, timedelta
import logging
from sqlalchemy import and_, or_, func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from . import runjob, store, db, notify
from .db import Job, ActiveJob
class Scheduler(object):
def __init__(self):
db.connect()
def jobs(self, status=None):
session = db.Session()
if status:
jobs = session.query(Job).filter(Job.status == status).order_by(Job.priority)
else:
jobs = session.query(Job).order_by(Job.priority)
return [j.id for j in jobs]
def submit(self, request, origin):
session = db.Session()
# Find number of jobs for the user in the last 30 days
n = (
session.query(Job)
.filter(or_(Job.notify == request["notify"], Job.origin == origin))
.filter(Job.date >= datetime.utcnow() - timedelta(30))
.count()
)
# print "N",n
job = Job(name=request["name"], notify=request["notify"], origin=origin, priority=n)
session.add(job)
session.commit()
store.create(job.id)
store.put(job.id, "request", request)
return job.id
def _getjob(self, id):
session = db.Session()
return session.query(Job).filter(Job.id == id).first()
def results(self, id):
job = self._getjob(id)
try:
return runjob.results(id)
except KeyError:
if job:
return {"status": job.status}
else:
return {"status": "UNKNOWN"}
def status(self, id):
job = self._getjob(id)
return job.status if job else "UNKNOWN"
def info(self, id):
request = store.get(id, "request")
return request
def cancel(self, id):
session = db.Session()
(
session.query(Job)
.filter(Job.id == id)
.filter(Job.status.in_("ACTIVE", "PENDING"))
.update({"status": "CANCEL"})
)
session.commit()
def delete(self, id):
"""
Delete any external storage associated with the job id. Mark the
job as deleted.
"""
session = db.Session()
(session.query(Job).filter(Job.id == id).update({"status": "DELETE"}))
store.destroy(id)
def nextjob(self, queue):
"""
Make the next PENDING job active, where pending jobs are sorted
by priority. Priority is assigned on the basis of usage and the
order of submissions.
"""
session = db.Session()
# Define a query which returns the lowest job id of the pending jobs
# with the minimum priority
_priority = select([func.min(Job.priority)], Job.status == "PENDING")
min_id = select([func.min(Job.id)], and_(Job.priority == _priority, Job.status == "PENDING"))
for _ in range(10): # Repeat if conflict over next job
# Get the next job, if there is one
try:
job = session.query(Job).filter(Job.id == min_id).one()
# print job.id, job.name, job.status, job.date, job.start, job.priority
except NoResultFound:
return {"request": None}
# Mark the job as active and record it in the active queue
(
session.query(Job)
.filter(Job.id == job.id)
.update(
{
"status": "ACTIVE",
"start": datetime.utcnow(),
}
)
)
activejob = db.ActiveJob(jobid=job.id, queue=queue)
session.add(activejob)
# If the job was already taken, roll back and try again. The
# first process to record the job in the active list wins, and
# will change the job status from PENDING to ACTIVE. Since the
# job is no longer pending, the so this
# should not be an infinite loop. Hopefully if the process
# that is doing the transaction gets killed in the middle then
# the database will be clever enough to roll back, otherwise
# we will never get out of this loop.
try:
session.commit()
except IntegrityError:
session.rollback()
continue
break
else:
logging.critical("dispatch could not assign job %s" % job.id)
raise IOError("dispatch could not assign job %s" % job.id)
request = store.get(job.id, "request")
# No reason to include time; email or twitter does that better than
# we can without client locale information.
notify.notify(user=job.notify, msg=job.name + " started", level=1)
return {"id": job.id, "request": request}
def postjob(self, id, results):
# TODO: redundancy check, confirm queue, check sig, etc.
# Update db
session = db.Session()
(
session.query(Job)
.filter(Job.id == id)
.update(
{
"status": results.get("status", "ERROR"),
"stop": datetime.utcnow(),
}
)
)
(session.query(ActiveJob).filter(ActiveJob.jobid == id).delete())
try:
session.commit()
except:
session.rollback()
# Save results
store.put(id, "results", results)
# Post notification
job = self._getjob(id)
if job.status == "COMPLETE":
if "value" in results:
status_msg = " ended with %s" % results["value"]
else:
status_msg = " complete"
elif job.status == "ERROR":
status_msg = " failed"
elif job.status == "CANCEL":
status_msg = " cancelled"
else:
status_msg = " with status " + job.status
# Note: no reason to include time; twitter or email will give it
# Plus, doing time correctly requires knowing the locale of the
# receiving client.
notify.notify(user=job.notify, msg=job.name + status_msg, level=2)
|