1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
|
"""
Support for UWSes defined in user RDs.
To understand this, start at makeUWSForService.
"""
#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import pickle as pickle
import datetime
import weakref
from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: for registration
from gavo import svcs
from gavo import utils
from gavo.protocols import uws
from gavo.protocols import uwsactions
class UserUWSTransitions(uws.ProcessBasedUWSTransitions):
"""The transition function for user-defined UWSes.
"""
def __init__(self):
uws.ProcessBasedUWSTransitions.__init__(self, "User")
def queueJob(self, newState, wjob, ignored):
"""puts a job on the queue.
"""
uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
wjob.uws.scheduleProcessQueueCheck()
def getCommandLine(self, wjob):
args = ["gavo", "uwsrun", "--", str(wjob.jobId)]
if base.DEBUG:
args[1:1] = ["--debug", "--traceback"]
return "gavo", args
def makeUWSJobParameterFor(inputKey):
"""returns a uws.JobParameter instance for inputKey.
"""
class SomeParameter(uws.JobParameter):
name = inputKey.name
_deserialize = inputKey._parse
_serialize = inputKey._unparse
return SomeParameter
class UserUWSJobBase(uws.UWSJobWithWD):
"""The base class for the service-specific user UWS jobs.
(i.e., the things that the UserUWSJobFactory spits out)
"""
_transitions = UserUWSTransitions()
_jobsTDId = "//uws#userjobs"
def makeUserUWSJobClass(service):
"""returns a class object for representing UWS jobs processing requests
for service
"""
class UserUWSJob(UserUWSJobBase):
pass
defaults = {}
for ik in service.getInputKeysFor("uws.xml"):
if ik.type=="file":
# these are handled by UPLOAD
setattr(UserUWSJob, "_parameter_upload", uws.UploadParameter())
setattr(UserUWSJob, "_parameter_"+ik.name, uws.FileParameter())
continue
setattr(UserUWSJob, "_parameter_"+ik.name,
makeUWSJobParameterFor(ik))
defaults[ik.name] = ik.values.default
defaultStr = utils.getCleanBytes(pickle.dumps(defaults, protocol=2)
).decode("ascii")
del defaults
def _(cls):
return defaultStr
UserUWSJob._default_parameters = classmethod(_)
UserUWSJob._default_jobClass = classmethod(
lambda _, v=service.getFullId(): v)
return UserUWSJob
class UserUWS(uws.UWSWithQueueing):
"""A UWS for "user jobs", i.e., generic things an a core.
These dynamically create job classes based on the processing core's
parameters. To make this happen, we'll need to override some of the
common UWS functions.
"""
joblistPreamble = ("<?xml-stylesheet href='/static"
"/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>")
jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
"useruws-job-to-html.xsl' type='text/xsl'?>")
def __init__(self, service, jobActions):
self.runcountGoal = base.getConfig("async", "maxUserUWSRunningDefault")
self.service = weakref.proxy(service)
uws.UWSWithQueueing.__init__(self,
makeUserUWSJobClass(service), jobActions)
def getURLForId(self, jobId):
return self.service.getURL("uws.xml")+"/"+jobId
def _getJob(self, jobId, conn, writable=False):
"""returns the named job as uws.UWS._getJob.
However, in a user UWS, there can be jobs from multiple services.
It would be nonsense to load another UWS's job's parameters into our
job class. To prevent this, we redirect if we find the new job's
class isn't ours. On the web interface, that should do the trick.
Everywhere else, this may not be entirely clear but still prevent
major confusion.
This is repeating code from uws.UWS._getJob; some refactoring at
some point would be nice.
"""
statementId = 'getById'
if writable:
statementId = 'getByIdEx'
res = self.runCanned(statementId, {"jobId": jobId}, conn)
if len(res)!=1:
raise uws.JobNotFound(jobId)
if res[0]["jobClass"]!=self.service.getFullId():
raise svcs.WebRedirect(
base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId))
return self.jobClass(res[0], self, writable)
def getIdsAndPhases(self, *args, **kwargs):
# for user UWSes, we only want jobs from our service in the job
# list resource. We insert extra conditions to the basic queries.
# getById and getAllIds don't change, though, as they're used internally
# and could influence, e.g., queueing and such.
return uws.UWSWithQueueing.getIdsAndPhases(self, *args,
initFragments=["jobClass=%(jobClass)s"],
initPars={"jobClass": self.service.getFullId()},
**kwargs)
def makeUWSForService(service):
"""returns a UserUWS instance tailored to service.
All these share a jobs table, but the all have different job
classes with the parameters custom-made for the service's core.
A drawback of this is that each UWS created in this way runs the
job table purger again. That shouldn't be a problem per se but
may become cumbersome at some point. We can always introduce a
class Attribute on UserUWS to keep additional UWSes from starting
cron jobs of their own.
"""
return UserUWS(service, uwsactions.JobActions())
####################### CLI
def parseCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Run an asynchronous"
" generic job (used internally)")
parser.add_argument("jobId", type=str, help="UWS id of the job to run")
return parser.parse_args()
def main():
args = parseCommandLine()
jobId = args.jobId
with base.getTableConn() as conn:
svcId = list(
conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s",
{"jobId": jobId}))[0][0]
service = base.resolveCrossId(svcId)
# we're the only ones running, so we're safe using this.
queryMeta = svcs.emptyQueryMeta
try:
job = service.getUWS().getJob(jobId)
with job.getWritable() as wjob:
wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
service = base.resolveCrossId(job.jobClass)
inputTable = svcs.CoreArgs(service.core.inputTable,
job.parameters, job.parameters)
inputTable.job = job
data = service._runWithInputTable(
service.core, inputTable, queryMeta)
# Our cores either return a table, a pair of mime and data,
# or None (in which case they added the results themselves)
if isinstance(data, tuple):
mime, payload = data
with job.openResult(mime, "result") as destF:
destF.write(payload)
elif isinstance(data, rsc.Data):
destFmt = inputTable.getParam("responseformat"
) or "application/x-votable+xml"
with job.openResult(destFmt, "result") as destF:
formats.formatData(destFmt, data, destF, False)
elif data is None:
pass
else:
raise NotImplementedError("Cannot handle a service %s result yet."%
repr(data))
with job.getWritable() as wjob:
wjob.change(phase=uws.COMPLETED)
except SystemExit:
pass
except uws.JobNotFound:
base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId)
except Exception as ex:
base.ui.notifyError("UWS runner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
service.getUWS().changeToPhase(jobId, uws.ERROR, ex)
raise
|