1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
|
"""
Manipulating UWS jobs through a REST interface.
The result documents are defined through the schema uws-1.0.xsd.
Instead of returning XML, they can also raise svcs.SeeOther exceptions.
However, these are caught in JobResource._redirectAsNecessary and appended
to the base URL auf the TAP service, so you must only give URIs relative
to the TAP service's root URL.
This UWS system should adapt to concrete UWSes; the UWS in use is passed
into the top-level functions (doJobAction , getJobList).
The actions pretty much resemble twisted.web Resources; perhaps this
should be refactored to use them. For now, however, we don't pass around
resources, we pass around callables that are used within renderers
in asyncrender, which lets asyncrender factor out a bit of common
functionality. Hm.
"""
#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import os
from twisted.internet import reactor
from twisted.internet import defer
from twisted.web import server
from twisted.web import static
from gavo import base
from gavo import svcs
from gavo import utils
from gavo.protocols import dali
from gavo.protocols import uws
from gavo.utils import stanxml
UWSNamespace = 'http://www.ivoa.net/xml/UWS/v1.0'
XlinkNamespace = "http://www.w3.org/1999/xlink"
stanxml.registerPrefix("uws", UWSNamespace,
stanxml.schemaURL("UWS-1.1.xsd"))
stanxml.registerPrefix("xlink", XlinkNamespace,
stanxml.schemaURL("xlink.xsd"))
class UWS(object):
"""the container for elements from the uws namespace.
"""
class UWSElement(stanxml.Element):
_prefix = "uws"
@staticmethod
def makeRoot(ob):
ob._additionalPrefixes = stanxml.xsiPrefix
ob._mayBeEmpty = True
return ob
class job(UWSElement):
_a_version = "1.1"
class jobs(UWSElement):
_mayBeEmpty = True
_a_version = "1.1"
class parameters(UWSElement): pass
class destruction(UWSElement): pass
class endTime(stanxml.NillableMixin, UWSElement): pass
class creationTime(UWSElement): pass
class executionDuration(UWSElement): pass
class jobId(UWSElement): pass
class jobInfo(UWSElement): pass
class message(UWSElement): pass
class ownerId(stanxml.NillableMixin, UWSElement): pass
class phase(UWSElement): pass
class quote(stanxml.NillableMixin, UWSElement): pass
class runId(UWSElement): pass
class startTime(stanxml.NillableMixin, UWSElement): pass
class detail(UWSElement):
_a_href = None
_a_type = None
_name_a_href = "xlink:href"
_name_a_type = "xlink:type"
class errorSummary(UWSElement):
_a_type = None # transient | fatal
_a_hasDetail = None
class jobref(UWSElement):
_additionalPrefixes = frozenset(["xlink"])
_a_id = None
_a_href = None
_a_type = None
_name_a_href = "xlink:href"
_name_a_type = "xlink:type"
class parameter(UWSElement):
_mayBeEmpty = True
_a_byReference = None
_a_id = None
_a_isPost = None
class result(UWSElement):
_additionalPrefixes = frozenset(["xlink"])
_mayBeEmpty = True
_a_id = None
_a_href = None
_a_type = None
_name_a_href = "xlink:href"
_name_a_type = "xlink:type"
class results(UWSElement):
_mayBeEmpty = True
def getJobList(workerSystem,
forOwner=None,
phase=None,
last=None,
after=None):
result = UWS.jobs()
for jobId, phase in workerSystem.getIdsAndPhases(
forOwner, phase, last, after):
result[
UWS.jobref(id=jobId, href=workerSystem.getURLForId(jobId))[
UWS.phase[phase]]]
return stanxml.xmlrender(result, workerSystem.joblistPreamble)
def getErrorSummary(job):
# all our errors are fatal, and for now .../error yields the same thing
# as we include here, so we hardcode the attributes.
errDesc = job.error
if not errDesc:
return None
msg = errDesc["msg"]
if errDesc["hint"]:
msg = msg+"\n\n -- Hint: "+errDesc["hint"]
return UWS.errorSummary(type="fatal", hasDetail="false")[
UWS.message[msg]]
def getParametersElement(job):
"""returns a UWS.parameters element for job.
"""
res = UWS.parameters()
for key, value in job.iterSerializedPars():
if isinstance(value, uws.ParameterRef):
res[UWS.parameter(id=key, byReference=True)[value.url]]
else:
res[UWS.parameter(id=key)[str(value)]]
return res
class JobActions(object):
"""A collection of "actions" performed on UWS jobs.
Their names are the names of the child resources of UWS jobs. The basic UWS
actions are built in. When constructing those, you can pass in as many
additional JobAction subclasses as you want. Set their names to
one of UWS standard actions to override UWS behaviour if you think
that's wise.
"""
_standardActions = {}
def __init__(self, *additionalActions):
self.actions = {}
self.actions.update(self._standardActions)
for actionClass in additionalActions:
self.actions[actionClass.name] = actionClass()
@classmethod
def addStandardAction(cls, actionClass):
cls._standardActions[actionClass.name] = actionClass()
def dispatch(self, action, job, request, segments):
if job.owner:
if request.getAuthUser()!=job.owner:
raise svcs.Authenticate()
try:
resFactory = self.actions[action]
except KeyError:
raise base.ui.logOldExc(
svcs.UnknownURI("Invalid UWS action '%s'"%action))
request.setHeader("content-type", resFactory.mime)
return resFactory.getResource(job, request, segments)
class JobAction(object):
"""an action done to a job.
It defines methods do<METHOD> that are dispatched through JobActions.
It must have a name corresponding to the child resource names from
the UWS spec.
"""
name = None
mime = "text/xml"
def getResource(self, job, request, segments):
if segments:
raise svcs.UnknownURI("Too many segments")
try:
handler = getattr(self, "do"+request.method.decode("ascii"))
except AttributeError:
raise base.ui.logOldExc(svcs.BadMethod(request.method))
return handler(job, request)
class ErrorAction(JobAction):
name = "error"
mime = "text/plain"
def doGET(self, job, request):
if job.error is None:
return b""
return dali.serveDALIError(request, job.error)
doPOST = doGET
JobActions.addStandardAction(ErrorAction)
class StartTimeAction(JobAction):
# This an extension over plain UWS allowing users to retrieve when
# their job started. In the DaCHS' TAP implementation, this lets
# you discern whether the taprunner is already processing an EXECUTING
# job (startTime!=NULL) or whether it's still coming up (else)
name = "startTime"
mime = "text/plain"
def doGET(self, job, request):
if job.startTime is None:
return b"NULL"
else:
return utils.formatISODT(job.startTime).encode("ascii")
doPOST = doGET
JobActions.addStandardAction(StartTimeAction)
class ParameterAction(JobAction):
name = "parameters"
def doGET(self, job, request):
request.setHeader("content-type", "text/xml")
return UWS.makeRoot(getParametersElement(job))
def doPOST(self, job, request):
if job.phase!=uws.PENDING:
raise base.ValidationError(
"Parameters cannot be changed in phase %s"%job.phase, "phase")
with job.getWritable() as wjob:
wjob.setParamsFromDict(request.uwsArgs)
raise svcs.SeeOther(job.jobId)
JobActions.addStandardAction(ParameterAction)
class PhaseAction(JobAction):
name = "phase"
mime = "text/plain"
timeout = 10 # this is here for testing
def doPOST(self, job, request):
newPhase = request.uwsArgs["phase"]
if newPhase=="RUN":
job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout)
elif newPhase=="ABORT":
job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout)
else:
raise base.ValidationError("Bad phase: %s"%newPhase, "phase")
raise svcs.SeeOther(job.jobId)
def doGET(self, job, request):
request.setHeader("content-type", "text/plain")
return job.phase
JobActions.addStandardAction(PhaseAction)
class _SettableAction(JobAction):
"""Abstract base for ExecDAction and DestructionAction.
"""
mime = "text/plain"
def doPOST(self, job, request):
raw = request.uwsArgs[self.name.lower()]
if raw is None: # with no parameter, fall back to GET
return self.doGET(job, request)
try:
val = self.deserializeValue(raw)
except ValueError:
raise base.ui.logOldExc(uws.UWSError("Invalid %s value: %s."%(
self.name.upper(), repr(raw)), job.jobId))
with job.getWritable() as wjob:
args = {self.attName: val}
wjob.change(**args)
raise svcs.SeeOther(job.jobId)
def doGET(self, job, request):
request.setHeader("content-type", "text/plain")
return self.serializeValue(getattr(job, self.attName))
class ExecDAction(_SettableAction):
name = "executionduration"
attName = 'executionDuration'
serializeValue = str
deserializeValue = float
JobActions.addStandardAction(ExecDAction)
class DestructionAction(_SettableAction):
name = "destruction"
attName = "destructionTime"
serializeValue = staticmethod(utils.formatISODT)
deserializeValue = staticmethod(utils.parseISODT)
JobActions.addStandardAction(DestructionAction)
class QuoteAction(JobAction):
name = "quote"
mime = "text/plain"
def doGET(self, job, request):
request.setHeader("content-type", "text/plain")
if job.quote is None:
quote = ""
else:
quote = utils.formatISODT(job.quote)
return quote
JobActions.addStandardAction(QuoteAction)
class OwnerAction(JobAction):
name = "owner"
mime = "text/plain"
def doGET(self, job, request):
request.setHeader("content-type", "text/plain")
if job.owner is None:
request.write("NULL")
else:
request.write(job.owner)
return b""
JobActions.addStandardAction(OwnerAction)
def _getResultsElement(job):
baseURL = job.getURL()+"/results/"
return UWS.results[[
UWS.result(id=res["resultName"], href=baseURL+res["resultName"])
for res in job.getResults()]]
class ResultsAction(JobAction):
"""Access result (Extension: and other) files in job directory.
"""
name = "results"
def getResource(self, job, request, segments):
if not segments:
return JobAction.getResource(self, job, request, segments)
res = None
# first try a "real" UWS result from the job
if len(segments)==1:
try:
fName, resultType = job.getResult(segments[0])
res = static.File(fName, defaultType=str(resultType))
res.encoding = None
except base.NotFoundError: # segments[0] does not name a result
pass # fall through to other files
if res is None:
# if that doesn't work, try to return some other file from the
# job directory. This is so we can deliver uploads.
filePath = os.path.join(job.getWD(), *segments)
if not os.path.abspath(filePath).startswith(
os.path.abspath(job.getWD())):
raise svcs.ForbiddenURI("Not serving files outside of job directory.")
if not os.path.exists(filePath):
raise svcs.UnknownURI("File not found")
res = static.File(filePath, defaultType="application/octet-stream")
# now render whatever we have found
res.render(request)
return server.NOT_DONE_YET
def doGET(self, job, request):
return _getResultsElement(job)
JobActions.addStandardAction(ResultsAction)
def _serializeTime(element, dt):
if dt is None:
return element()
return element[utils.formatISODT(dt)]
class RootAction(JobAction):
"""Actions for async/jobId.
"""
name = ""
def doDELETE(self, job, request):
"""Implements DELETE on a job resource.
This is the UWS-compliant way to delete a job.
"""
job.uws.destroy(job.jobId)
raise svcs.SeeOther("")
def doPOST(self, wjob, request):
"""Implements POST on a job resource.
This is a DaCHS extension to UWS in order to let web browsers
delete jobs by passing action=DELETE.
"""
if request.uwsArgs.get("action")=="DELETE":
self.doDELETE(wjob, request)
else:
raise svcs.BadMethod("POST")
def _delayIfWAIT(self, job, request):
"""delays if request has UWS 1.1 "slow poll" arguments, returns None
otherwise.
This is a helper for doGET.
"""
# This is the implemenation of UWS 1.1 "slow polling".
# We still do polling internally rather than use postgres'
# LISTEN/NOTIFY since the overhead seems rather moderate and
# the added complexity of setting up notifcations appeared not
# proportional to saving it.
args = request.uwsArgs
if args.get("wait") is None:
return
waitFor = int(args["wait"])
if waitFor==-1:
waitFor = base.getConfig("async", "maxslowpollwait")
if args.get("phase") is not None and args["phase"]!=job.phase:
return
if job.phase not in ["PENDING", "QUEUED", "EXECUTING"]:
return
d = defer.Deferred().addCallback(
self._recheck, job.uws, job.jobId, waitFor-1, job.phase)
reactor.callLater(1, d.callback, request)
return d
def _recheck(self,
request, workerSystem, jobId, remainingWait, originalPhase):
"""the callback for doing slow polls.
"""
job = workerSystem.getJob(jobId)
if originalPhase!=job.phase or remainingWait<=0:
request.uwsArgs.pop("wait", None)
request.uwsArgs.pop("phase", None)
return self.doGET(job, request)
d = defer.Deferred().addCallback(
self._recheck, workerSystem, jobId, remainingWait-1, originalPhase)
reactor.callLater(1, d.callback, request)
return d
def doGET(self, job, request):
"""Implements GET on a job resource: return the current job metadata.
"""
delay = self._delayIfWAIT(job, request)
if delay:
return delay
request.setHeader("content-type", "text/xml")
tree = UWS.makeRoot(UWS.job[
UWS.jobId[job.jobId],
UWS.runId[job.runId],
UWS.ownerId[job.owner],
UWS.phase[job.phase],
UWS.quote[utils.formatISODT(job.quote)],
UWS.creationTime[utils.formatISODT(job.creationTime)],
_serializeTime(UWS.startTime, job.startTime),
_serializeTime(UWS.endTime, job.endTime),
UWS.executionDuration[str(job.executionDuration)],
UWS.destruction[utils.formatISODT(job.destructionTime)],
getParametersElement(job),
_getResultsElement(job),
getErrorSummary(job)])
return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
JobActions.addStandardAction(RootAction)
def doJobAction(workerSystem, request, segments):
"""handles the async UI of UWS.
Depending on method and segments, it will return various XML strings
and may cause certain actions.
Segments must be a tuple with at least one element, the job id.
"""
jobId, segments = segments[0], segments[1:]
if not segments:
action = ""
else:
action, segments = segments[0], segments[1:]
return workerSystem.jobActions.dispatch(action,
workerSystem.getJob(jobId), request, segments)
|