File: dlasync.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (298 lines) | stat: -rw-r--r-- 8,257 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
"""
A UWS-based interface to datalink.

TODO: There's quite a bit of parallel between this and useruws.  This
should probably be reformulated along the lines of useruws.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import pickle as pickle
import datetime

from twisted.web import server

from gavo import base
from gavo import svcs
from gavo import utils
from gavo import rscdesc #noflake: cache registration
from gavo.protocols import products
from gavo.protocols import uws
from gavo.protocols import uwsactions

# TODO: We're not supposed to import from helpers in main code.  Fix
# this.
from gavo.helpers import testtricks


class DLTransitions(uws.ProcessBasedUWSTransitions):
	"""The transition function for datalink jobs.
	"""
	def __init__(self):
		uws.ProcessBasedUWSTransitions.__init__(self, "DL")

	def queueJob(self, newState, wjob, ignored):
		uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
		return self.startJob(uws.EXECUTING, wjob, ignored)

	def getCommandLine(self, wjob):
		return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]


class ServiceIdParameter(uws.JobParameter):
	"""A fully qualified id of the DaCHS service to execute the datalink
	request.
	"""


class ArgsParameter(uws.JobParameter):
	"""all parameters passed to the datalink job as a request.args dict.

	The serialised representation is the pickled dict.  Pickle is ok as
	the string never leaves our control (the network serialisation is
	whatever comes in via the POST).
	"""
	@staticmethod
	def _deserialize(pickled):
		return pickle.loads(pickled)
	
	@staticmethod
	def _serialize(args):
		return pickle.dumps(args)


class DLJob(uws.UWSJobWithWD):
	"""a UWS job performing some datalink data preparation.

	In addition to UWS parameters, it has

	* serviceid -- the fully qualified id of the service that will process
	  the request
	* datalinkargs -- the parameters (in request.args form) of the
	  datalink request.
	"""
	_jobsTDId = "//datalink#datalinkjobs"
	_transitions = DLTransitions()

	_parameter_serviceid = ServiceIdParameter
	_parameter_datalinkargs = ArgsParameter

	def setParamsFromDict(self, args):
		"""stores datalinkargs from args.

		As there's only one common UWS for all dlasync services, we have
		to steal the service object from upstack at the moment.  Let's see
		if there's a way around that later.
		"""
		self.setPar("datalinkargs", args)
		self.setPar("serviceid", utils.stealVar("service").getFullId())


class DLUWS(uws.UWS):
	"""the worker system for datalink jobs.
	"""
	joblistPreamble = ("<?xml-stylesheet href='/static"
		"/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>")
	jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
		"dlasync-job-to-html.xsl' type='text/xsl'?>")

	_baseURLCache = None

	def __init__(self):
		uws.UWS.__init__(self, DLJob, uwsactions.JobActions())

	@property
	def baseURL(self):
		return base.makeAbsoluteURL("datalinkuws")

	def getURLForId(self, jobId):
		"""returns a fully qualified URL for the job with jobId.
		"""
		return "%s/%s"%(self.baseURL, jobId)



DL_WORKER = DLUWS()

####################### twisted.web simulation
# This is so we can handle t.w resources coming back from datalink.
# Factor this out?  This is essentially stolen from trialhelpers,
# and we might just put that somewhere where it's useful.

import warnings
from twisted.internet import defer
from twisted.internet import reactor


def _requestDone(result, request):
	"""essentially calls render on result and stops the reactor.

	This is a helper for our t.w simulation.
	"""
	if isinstance(result, str):
		if result:
			request.write(result)
	else:
		warnings.warn("Unsupported async datalink render result: %s"%repr(result))
	request.deferred.callback(request.accumulator)
	reactor.stop()
	return request.accumulator, request


class WritingFakeRequest(testtricks.FakeRequest):
	"""a simulator for actual t.w requests.

	We want this here as we're rendering to a UWS result file
	with the same code that renders to web requests.

	One could probably go a lot simpler than testtricks.FakeRequest,
	but since the code is there anyway, it probably doesn't hurt
	to use it in case we want to do fancier things in the future.

	The one thing I have to change vs. testtricks is that we want
	to write to files.
	"""
	def __init__(self, destFile):
		self.destFile = destFile
		testtricks.FakeRequest.__init__(self, "")
	
	def write(self, stuff):
		self.destFile.write(stuff)

	def finish(self):
		self.deferred.callback(None)
		self.destFile.close()
		reactor.stop()


def writeResultTo(page, destFile):
	"""arranges for the result of rendering the twisted.web resource
	to be written to destFile.

	This uses a very simple simulation of t.w rendering, so a few
	tricks are possible.  Also, it actually runs a reactor to do its magic.

	Do not run this in a running DaCHS server; it has its own reactor
	running.  This is only for dachs dlrun code.

	TODO: There's proabably code for this in t.w.
	"""
	def _(func, req):
		try:
			res = func(req)
		except Exception:
			request.finish()
			raise

		if res==server.NOT_DONE_YET:
			# resource will finish the request itself later
			return

		try:
			if res:
				req.write(res)
		finally:
			req.finish()

	request = WritingFakeRequest(destFile)
	reactor.callWhenRunning(_, page.render, request)
	reactor.run()
	return request



####################### CLI

def parseCommandLine():
	import argparse
	parser = argparse.ArgumentParser(description="Run an asynchronous datalink"
		" job (used internally)")
	parser.add_argument("jobId", type=str, help="UWS id of the job to run")
	return parser.parse_args()


def main():
	args = parseCommandLine()
	jobId = args.jobId
	try:
		job = DL_WORKER.getJob(jobId)
		with job.getWritable() as wjob:
			wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())

		service = base.resolveCrossId(job.parameters["serviceid"])
		args = job.parameters["datalinkargs"]
		data = service.run("dlget", args, svcs.emptyQueryMeta)

		# Unfortunately, datalink cores can in principle return all kinds
		# of messy things that may not even be representable in plain files
		# (e.g., t.w resources returning redirects).  We hence only
		# handle (mime, payload) and (certain) Product instances here
		# and error out otherwise.
		if isinstance(data, tuple):
			mime, payload = data
			with job.openResult(mime, "result") as destF:
				destF.write(payload)

		elif isinstance(data, products.ProductBase):
			# We could run render and grab the content-type from there
			# (which probably would be better all around).  For now, don't
			# care:
			with job.openResult("application/octet-stream", "result") as destF:
				for chunk in data.iterData():
					destF.write(chunk)

		elif hasattr(data, "render"):
			# these are t.w. resources.  Let's run a reactor so these properly
			# work.
			with job.openResult(type, "result") as destF:
				req = writeResultTo(data, destF)
			job.fixTypeForResultName("result", 
				req.responseHeaders.getRawHeaders("content-type")[0])

		else:
			raise NotImplementedError("Cannot handle a service %s result yet."%
				repr(data))
		
		with job.getWritable() as wjob:
			wjob.change(phase=uws.COMPLETED)

	except SystemExit:
		pass
	except uws.JobNotFound:
		base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId)
	except Exception as ex:
		base.ui.notifyError("Datalink runner %s major failure"%jobId)
		# try to push job into the error state -- this may well fail given
		# that we're quite hosed, but it's worth the try
		DL_WORKER.changeToPhase(jobId, uws.ERROR, ex)
		raise


if __name__=="__main__": # pagma: no cover
	# silly test code, not normally reached
	from twisted.web import resource
	import os
	class _Foo(resource.Resource):
		def __init__(self, stuff):
			self.stuff = stuff

		def render(self, request):
			if self.stuff=="booga":
				return b"abc"
			else:
				return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup)

		def cleanup(self, res):
			print("cleaning up")
			return res

	with open("bla", "w") as f:
		writeResultTo(_Foo("ork"), f)
	with open("bla") as f:
		print(f.read())
	os.unlink("bla")