File: uwsactions.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (524 lines) | stat: -rw-r--r-- 14,513 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
"""
Manipulating UWS jobs through a REST interface.

The result documents are defined through the schema uws-1.0.xsd.

Instead of returning XML, they can also raise svcs.SeeOther exceptions.
However, these are caught in JobResource._redirectAsNecessary and appended
to the base URL auf the TAP service, so you must only give URIs relative
to the TAP service's root URL.

This UWS system should adapt to concrete UWSes; the UWS in use is passed
into the top-level functions (doJobAction , getJobList).

The actions pretty much resemble twisted.web Resources; perhaps this
should be refactored to use them.  For now, however, we don't pass around 
resources, we pass around callables that are used within renderers
in asyncrender, which lets asyncrender factor out a bit of common
functionality.  Hm.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os

from twisted.internet import reactor
from twisted.internet import defer
from twisted.web import server
from twisted.web import static

from gavo import base
from gavo import svcs
from gavo import utils
from gavo.protocols import dali
from gavo.protocols import uws
from gavo.utils import stanxml


UWSNamespace = 'http://www.ivoa.net/xml/UWS/v1.0'
XlinkNamespace = "http://www.w3.org/1999/xlink"
stanxml.registerPrefix("uws", UWSNamespace,
	stanxml.schemaURL("UWS-1.1.xsd"))
stanxml.registerPrefix("xlink", XlinkNamespace,
	stanxml.schemaURL("xlink.xsd"))


class UWS(object):
	"""the container for elements from the uws namespace.
	"""
	class UWSElement(stanxml.Element):
		_prefix = "uws"

	@staticmethod
	def makeRoot(ob):
		ob._additionalPrefixes = stanxml.xsiPrefix
		ob._mayBeEmpty = True
		return ob

	class job(UWSElement):
		_a_version = "1.1"

	class jobs(UWSElement):
		_mayBeEmpty = True
		_a_version = "1.1"

	class parameters(UWSElement): pass

	class destruction(UWSElement): pass
	class endTime(stanxml.NillableMixin, UWSElement): pass
	class creationTime(UWSElement): pass
	class executionDuration(UWSElement): pass
	class jobId(UWSElement): pass
	class jobInfo(UWSElement): pass
	class message(UWSElement): pass
	class ownerId(stanxml.NillableMixin, UWSElement): pass
	class phase(UWSElement): pass
	class quote(stanxml.NillableMixin, UWSElement): pass
	class runId(UWSElement): pass
	class startTime(stanxml.NillableMixin, UWSElement): pass
	
	class detail(UWSElement):
		_a_href = None
		_a_type = None
		_name_a_href = "xlink:href"
		_name_a_type = "xlink:type"
	
	class errorSummary(UWSElement):
		_a_type = None  # transient | fatal
		_a_hasDetail = None

	class jobref(UWSElement):
		_additionalPrefixes = frozenset(["xlink"])
		_a_id = None
		_a_href = None
		_a_type = None
		_name_a_href = "xlink:href"
		_name_a_type = "xlink:type"

	class parameter(UWSElement):
		_mayBeEmpty = True
		_a_byReference = None
		_a_id = None
		_a_isPost = None

	class result(UWSElement):
		_additionalPrefixes = frozenset(["xlink"])
		_mayBeEmpty = True
		_a_id = None
		_a_href = None
		_a_type = None
		_name_a_href = "xlink:href"
		_name_a_type = "xlink:type"

	class results(UWSElement):
		_mayBeEmpty = True


def getJobList(workerSystem, 
		forOwner=None, 
		phase=None, 
		last=None, 
		after=None):
	result = UWS.jobs()
	for jobId, phase in workerSystem.getIdsAndPhases(
			forOwner, phase, last, after):
		result[
			UWS.jobref(id=jobId, href=workerSystem.getURLForId(jobId))[
				UWS.phase[phase]]]
	return stanxml.xmlrender(result, workerSystem.joblistPreamble)


def getErrorSummary(job):
# all our errors are fatal, and for now .../error yields the same thing
# as we include here, so we hardcode the attributes.
	errDesc = job.error
	if not errDesc:
		return None
	msg = errDesc["msg"]
	if errDesc["hint"]:
		msg = msg+"\n\n -- Hint: "+errDesc["hint"]
	return UWS.errorSummary(type="fatal", hasDetail="false")[
		UWS.message[msg]]


def getParametersElement(job):
	"""returns a UWS.parameters element for job.
	"""
	res = UWS.parameters()
	for key, value in job.iterSerializedPars():
		if isinstance(value, uws.ParameterRef):
			res[UWS.parameter(id=key, byReference=True)[value.url]]
		else:
			res[UWS.parameter(id=key)[str(value)]]
	return res


class JobActions(object):
	"""A collection of "actions" performed on UWS jobs.

	Their names are the names of the child resources of UWS jobs.  The basic UWS
	actions are built in.  When constructing those, you can pass in as many
	additional JobAction subclasses as you want.  Set their names to
	one of UWS standard actions to override UWS behaviour if you think
	that's wise.
	"""
	_standardActions = {}

	def __init__(self, *additionalActions):
		self.actions = {}
		self.actions.update(self._standardActions)
		for actionClass in additionalActions:
			self.actions[actionClass.name] = actionClass()

	@classmethod
	def addStandardAction(cls, actionClass):
		cls._standardActions[actionClass.name] = actionClass()

	def dispatch(self, action, job, request, segments):
		if job.owner:
			if request.getAuthUser()!=job.owner:
				raise svcs.Authenticate()

		try:
			resFactory = self.actions[action]
		except KeyError:
			raise base.ui.logOldExc(
				svcs.UnknownURI("Invalid UWS action '%s'"%action))
		request.setHeader("content-type", resFactory.mime)
		return resFactory.getResource(job, request, segments)
		

class JobAction(object):
	"""an action done to a job.

	It defines methods do<METHOD> that are dispatched through JobActions.

	It must have a name corresponding to the child resource names from
	the UWS spec.
	"""
	name = None
	mime = "text/xml"

	def getResource(self, job, request, segments):
		if segments:
			raise svcs.UnknownURI("Too many segments")
		try:
			handler = getattr(self, "do"+request.method.decode("ascii"))
		except AttributeError:
			raise base.ui.logOldExc(svcs.BadMethod(request.method))
		return handler(job, request)


class ErrorAction(JobAction):
	name = "error"
	mime = "text/plain"

	def doGET(self, job, request):
		if job.error is None:
			return b""

		return dali.serveDALIError(request, job.error)

	doPOST = doGET
JobActions.addStandardAction(ErrorAction)


class StartTimeAction(JobAction):
# This an extension over plain UWS allowing users to retrieve when
# their job started.  In the DaCHS' TAP implementation, this lets 
# you discern whether the taprunner is already processing an EXECUTING
# job (startTime!=NULL) or whether it's still coming up (else)
	name = "startTime"
	mime = "text/plain"

	def doGET(self, job, request):
		if job.startTime is None:
			return b"NULL"
		else:
			return utils.formatISODT(job.startTime).encode("ascii")

	doPOST = doGET
JobActions.addStandardAction(StartTimeAction)


class ParameterAction(JobAction):
	name = "parameters"

	def doGET(self, job, request):
		request.setHeader("content-type", "text/xml")
		return UWS.makeRoot(getParametersElement(job))
	
	def doPOST(self, job, request):
		if job.phase!=uws.PENDING:
			raise base.ValidationError(
				"Parameters cannot be changed in phase %s"%job.phase, "phase")
		with job.getWritable() as wjob:
			wjob.setParamsFromDict(request.uwsArgs)
		raise svcs.SeeOther(job.jobId)

JobActions.addStandardAction(ParameterAction)

class PhaseAction(JobAction):
	name = "phase"
	mime = "text/plain"
	timeout = 10  # this is here for testing

	def doPOST(self, job, request):
		newPhase = request.uwsArgs["phase"]
		if newPhase=="RUN":
			job.uws.changeToPhase(job.jobId, uws.QUEUED, timeout=self.timeout)
		elif newPhase=="ABORT":
			job.uws.changeToPhase(job.jobId, uws.ABORTED, timeout=self.timeout)
		else:
			raise base.ValidationError("Bad phase: %s"%newPhase, "phase")
		raise svcs.SeeOther(job.jobId)
	
	def doGET(self, job, request):
		request.setHeader("content-type", "text/plain")
		return job.phase
JobActions.addStandardAction(PhaseAction)


class _SettableAction(JobAction):
	"""Abstract base for ExecDAction and DestructionAction.
	"""
	mime = "text/plain"

	def doPOST(self, job, request):
		raw = request.uwsArgs[self.name.lower()]
		if raw is None:  # with no parameter, fall back to GET
			return self.doGET(job, request)
		try:
			val = self.deserializeValue(raw)
		except ValueError:  
			raise base.ui.logOldExc(uws.UWSError("Invalid %s value: %s."%(
				self.name.upper(), repr(raw)), job.jobId))
		with job.getWritable() as wjob:
			args = {self.attName: val}
			wjob.change(**args)
		raise svcs.SeeOther(job.jobId)

	def doGET(self, job, request):
		request.setHeader("content-type", "text/plain")
		return self.serializeValue(getattr(job, self.attName))


class ExecDAction(_SettableAction):
	name = "executionduration"
	attName = 'executionDuration'
	serializeValue = str
	deserializeValue = float
JobActions.addStandardAction(ExecDAction)


class DestructionAction(_SettableAction):
	name = "destruction"
	attName = "destructionTime"
	serializeValue = staticmethod(utils.formatISODT)
	deserializeValue = staticmethod(utils.parseISODT)
JobActions.addStandardAction(DestructionAction)


class QuoteAction(JobAction):
	name = "quote"
	mime = "text/plain"

	def doGET(self, job, request):
		request.setHeader("content-type", "text/plain")
		if job.quote is None:
			quote = ""
		else:
			quote = utils.formatISODT(job.quote)
		return quote
	
JobActions.addStandardAction(QuoteAction)


class OwnerAction(JobAction):
	name = "owner"
	mime = "text/plain"

	def doGET(self, job, request):
		request.setHeader("content-type", "text/plain")
		if job.owner is None:
			request.write("NULL")
		else:
			request.write(job.owner)
		return b""

JobActions.addStandardAction(OwnerAction)


def _getResultsElement(job):
	baseURL = job.getURL()+"/results/"
	return UWS.results[[
			UWS.result(id=res["resultName"], href=baseURL+res["resultName"])
		for res in job.getResults()]]


class ResultsAction(JobAction):
	"""Access result (Extension: and other) files in job directory.
	"""
	name = "results"

	def getResource(self, job, request, segments):
		if not segments:
			return JobAction.getResource(self, job, request, segments)

		res = None
		# first try a "real" UWS result from the job
		if len(segments)==1:
			try:
				fName, resultType = job.getResult(segments[0])
				res = static.File(fName, defaultType=str(resultType))
				res.encoding = None
			except base.NotFoundError: # segments[0] does not name a result
				pass                     # fall through to other files

		if res is None:
			# if that doesn't work, try to return some other file from the
			# job directory.  This is so we can deliver uploads.
			filePath = os.path.join(job.getWD(), *segments)
			if not os.path.abspath(filePath).startswith(
					os.path.abspath(job.getWD())):
				raise svcs.ForbiddenURI("Not serving files outside of job directory.")

			if not os.path.exists(filePath):
				raise svcs.UnknownURI("File not found")

			res = static.File(filePath, defaultType="application/octet-stream")

		# now render whatever we have found
		res.render(request)
		return server.NOT_DONE_YET

	def doGET(self, job, request):
		return _getResultsElement(job)

JobActions.addStandardAction(ResultsAction)


def _serializeTime(element, dt):
	if dt is None:
		return element()
	return element[utils.formatISODT(dt)]


class RootAction(JobAction):
	"""Actions for async/jobId.
	"""
	name = ""

	def doDELETE(self, job, request):
		"""Implements DELETE on a job resource.

		This is the UWS-compliant way to delete a job.
		"""
		job.uws.destroy(job.jobId)
		raise svcs.SeeOther("")

	def doPOST(self, wjob, request):
		"""Implements POST on a job resource.

		This is a DaCHS extension to UWS in order to let web browsers
		delete jobs by passing action=DELETE.
		"""
		if request.uwsArgs.get("action")=="DELETE":
			self.doDELETE(wjob, request)
		else:
			raise svcs.BadMethod("POST")

	def _delayIfWAIT(self, job, request):
		"""delays if request has UWS 1.1 "slow poll" arguments, returns None
		otherwise.

		This is a helper for doGET.
		"""
		# This is the implemenation of UWS 1.1 "slow polling".
		# We still do polling internally rather than use postgres'
		# LISTEN/NOTIFY since the overhead seems rather moderate and
		# the added complexity of setting up notifcations appeared not
		# proportional to saving it.
		args = request.uwsArgs

		if args.get("wait") is None:
			return

		waitFor = int(args["wait"])
		if waitFor==-1:
			waitFor = base.getConfig("async", "maxslowpollwait")

		if args.get("phase") is not None and args["phase"]!=job.phase:
			return

		if job.phase not in ["PENDING", "QUEUED", "EXECUTING"]:
			return
			
		d = defer.Deferred().addCallback(
			self._recheck, job.uws, job.jobId, waitFor-1, job.phase)
		reactor.callLater(1, d.callback, request)
		return d
	
	def _recheck(self, 
			request, workerSystem, jobId, remainingWait, originalPhase):
		"""the callback for doing slow polls.
		"""
		job = workerSystem.getJob(jobId)
		if originalPhase!=job.phase or remainingWait<=0:
			request.uwsArgs.pop("wait", None)
			request.uwsArgs.pop("phase", None)
			return self.doGET(job, request)

		d = defer.Deferred().addCallback(
			self._recheck, workerSystem, jobId, remainingWait-1, originalPhase)
		reactor.callLater(1, d.callback, request)
		return d


	def doGET(self, job, request):
		"""Implements GET on a job resource: return the current job metadata.
		"""
		delay = self._delayIfWAIT(job, request)
		if delay:
			return delay
		request.setHeader("content-type", "text/xml")

		tree = UWS.makeRoot(UWS.job[
			UWS.jobId[job.jobId],
			UWS.runId[job.runId],
			UWS.ownerId[job.owner],
			UWS.phase[job.phase],
			UWS.quote[utils.formatISODT(job.quote)],
			UWS.creationTime[utils.formatISODT(job.creationTime)],
			_serializeTime(UWS.startTime, job.startTime),
			_serializeTime(UWS.endTime, job.endTime),
			UWS.executionDuration[str(job.executionDuration)],
			UWS.destruction[utils.formatISODT(job.destructionTime)],
			getParametersElement(job),
			_getResultsElement(job),
			getErrorSummary(job)])
		return stanxml.xmlrender(tree, job.uws.jobdocPreamble)
				

JobActions.addStandardAction(RootAction)


def doJobAction(workerSystem, request, segments):
	"""handles the async UI of UWS.

	Depending on method and segments, it will return various XML strings
	and may cause certain actions.

	Segments must be a tuple with at least one element, the job id.
	"""
	jobId, segments = segments[0], segments[1:]
	if not segments:
		action = ""
	else:
		action, segments = segments[0], segments[1:]
	return workerSystem.jobActions.dispatch(action, 
		workerSystem.getJob(jobId), request, segments)