File: useruws.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (237 lines) | stat: -rw-r--r-- 7,279 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
Support for UWSes defined in user RDs.

To understand this, start at makeUWSForService.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import pickle as pickle
import datetime
import weakref

from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: for registration
from gavo import svcs
from gavo import utils
from gavo.protocols import uws
from gavo.protocols import uwsactions


class UserUWSTransitions(uws.ProcessBasedUWSTransitions):
	"""The transition function for user-defined UWSes.
	"""
	def __init__(self):
		uws.ProcessBasedUWSTransitions.__init__(self, "User")

	def queueJob(self, newState, wjob, ignored):
		"""puts a job on the queue.
		"""
		uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
		wjob.uws.scheduleProcessQueueCheck()

	def getCommandLine(self, wjob):
		args = ["gavo", "uwsrun", "--", str(wjob.jobId)]
		if base.DEBUG:
			args[1:1] = ["--debug", "--traceback"]
		return "gavo", args


def makeUWSJobParameterFor(inputKey):
	"""returns a uws.JobParameter instance for inputKey.
	"""
	class SomeParameter(uws.JobParameter):
		name = inputKey.name
		_deserialize = inputKey._parse
		_serialize = inputKey._unparse
	return SomeParameter


class UserUWSJobBase(uws.UWSJobWithWD):
	"""The base class for the service-specific user UWS jobs.

	(i.e., the things that the UserUWSJobFactory spits out)
	"""
	_transitions = UserUWSTransitions()
	_jobsTDId = "//uws#userjobs"


def makeUserUWSJobClass(service):
	"""returns a class object for representing UWS jobs processing requests
	for service
	"""
	class UserUWSJob(UserUWSJobBase):
		pass

	defaults = {}
	for ik in service.getInputKeysFor("uws.xml"):
		if ik.type=="file":
			# these are handled by UPLOAD
			setattr(UserUWSJob, "_parameter_upload", uws.UploadParameter())
			setattr(UserUWSJob, "_parameter_"+ik.name, uws.FileParameter())
			continue

		setattr(UserUWSJob, "_parameter_"+ik.name,
			makeUWSJobParameterFor(ik))
		defaults[ik.name] = ik.values.default

	defaultStr = utils.getCleanBytes(pickle.dumps(defaults, protocol=2)
		).decode("ascii")
	del defaults
	def _(cls):
		return defaultStr
	
	UserUWSJob._default_parameters = classmethod(_)
	UserUWSJob._default_jobClass = classmethod(
		lambda _, v=service.getFullId(): v)
	
	return UserUWSJob


class UserUWS(uws.UWSWithQueueing):
	"""A UWS for "user jobs", i.e., generic things an a core.

	These dynamically create job classes based on the processing core's
	parameters.  To make this happen, we'll need to override some of the
	common UWS functions.
	"""
	joblistPreamble = ("<?xml-stylesheet href='/static"
		"/xsl/useruws-joblist-to-html.xsl' type='text/xsl'?>")
	jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
		"useruws-job-to-html.xsl' type='text/xsl'?>")

	def __init__(self, service, jobActions):
		self.runcountGoal = base.getConfig("async", "maxUserUWSRunningDefault")
		self.service = weakref.proxy(service)
		uws.UWSWithQueueing.__init__(self, 
			makeUserUWSJobClass(service), jobActions)

	def getURLForId(self, jobId):
		return self.service.getURL("uws.xml")+"/"+jobId

	def _getJob(self, jobId, conn, writable=False):
		"""returns the named job as uws.UWS._getJob.

		However, in a user UWS, there can be jobs from multiple services.
		It would be nonsense to load another UWS's job's parameters into our
		job class.  To prevent this, we redirect if we find the new job's
		class isn't ours. On the web interface, that should do the trick.  
		Everywhere else, this may not be entirely clear but still prevent 
		major confusion.

		This is repeating code from uws.UWS._getJob; some refactoring at
		some point would be nice.
		"""
		statementId = 'getById'
		if writable:
			statementId = 'getByIdEx'
		res = self.runCanned(statementId, {"jobId": jobId}, conn)
		if len(res)!=1:
			raise uws.JobNotFound(jobId)
	
		if res[0]["jobClass"]!=self.service.getFullId():
			raise svcs.WebRedirect(
				base.resolveCrossId(res[0]["jobClass"]).getUWS().getURLForId(jobId))

		return self.jobClass(res[0], self, writable)

	def getIdsAndPhases(self, *args, **kwargs):
		# for user UWSes, we only want jobs from our service in the job
		# list resource.  We insert extra conditions to the basic queries.
		# getById and getAllIds don't change, though, as they're used internally
		# and could influence, e.g., queueing and such.
		return uws.UWSWithQueueing.getIdsAndPhases(self, *args, 
			initFragments=["jobClass=%(jobClass)s"],
			initPars={"jobClass": self.service.getFullId()},
			**kwargs)
	

def makeUWSForService(service):
	"""returns a UserUWS instance tailored to service.

	All these share a jobs table, but the all have different job
	classes with the parameters custom-made for the service's core.

	A drawback of this is that each UWS created in this way runs the
	job table purger again.  That shouldn't be a problem per se but
	may become cumbersome at some point.  We can always introduce a
	class Attribute on UserUWS to keep additional UWSes from starting
	cron jobs of their own.
	"""
	return UserUWS(service, uwsactions.JobActions())


####################### CLI

def parseCommandLine():
	import argparse
	parser = argparse.ArgumentParser(description="Run an asynchronous"
		" generic job (used internally)")
	parser.add_argument("jobId", type=str, help="UWS id of the job to run")
	return parser.parse_args()


def main():
	args = parseCommandLine()
	jobId = args.jobId
	with base.getTableConn() as conn:
		svcId = list(
			conn.query("SELECT jobclass FROM uws.userjobs WHERE jobId=%(jobId)s",
				{"jobId": jobId}))[0][0]
	service = base.resolveCrossId(svcId)

	# we're the only ones running, so we're safe using this.
	queryMeta = svcs.emptyQueryMeta

	try:
		job = service.getUWS().getJob(jobId)
		with job.getWritable() as wjob:
			wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())

		service = base.resolveCrossId(job.jobClass)
		inputTable = svcs.CoreArgs(service.core.inputTable,
			job.parameters, job.parameters)
		inputTable.job = job

		data = service._runWithInputTable(
			service.core, inputTable, queryMeta)

		# Our cores either return a table, a pair of mime and data,
		# or None (in which case they added the results themselves)
		if isinstance(data, tuple):
			mime, payload = data
			with job.openResult(mime, "result") as destF:
				destF.write(payload)

		elif isinstance(data, rsc.Data):
			destFmt = inputTable.getParam("responseformat"
				) or "application/x-votable+xml"
			with job.openResult(destFmt, "result") as destF:
				formats.formatData(destFmt, data, destF, False)

		elif data is None:
			pass

		else:
			raise NotImplementedError("Cannot handle a service %s result yet."%
				repr(data))
		
		with job.getWritable() as wjob:
			wjob.change(phase=uws.COMPLETED)

	except SystemExit:
		pass
	except uws.JobNotFound:
		base.ui.notifyInfo("Giving up non-existing UWS job %s."%jobId)
	except Exception as ex:
		base.ui.notifyError("UWS runner %s major failure"%jobId)
		# try to push job into the error state -- this may well fail given
		# that we're quite hosed, but it's worth the try
		service.getUWS().changeToPhase(jobId, uws.ERROR, ex)
		raise