File: taprunner.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (393 lines) | stat: -rw-r--r-- 12,057 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
"""
Execution of UWS (right now, TAP only) requests.

This mainly intended to be exec'd (through some wrapper) by the queue
runner in the main server thread.  The jobs executed have to be in
the database and have to have a job directory.

Primarily for testing an alternative interface rabRun exists that takes that
takes jobid, and parameters.

The tap runner takes the job to EXECUTING shortly before sending the
query to the DB server.  When done, the job's state is one of COMPLETED, 
ABORTED or ERROR.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import sys
import time

from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: cache registration
from gavo import svcs
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.formats import texttable #noflake: format registration
from gavo.formats import csvtable #noflake: format registration
from gavo.formats import jsontable #noflake: format registration
from gavo.formats import geojson #noflake: format registration
from gavo.formats import votableread
from gavo.formats import votablewrite
from gavo.protocols import adqlglue
from gavo.protocols import simbadinterface #noflake: UDF registration
from gavo.protocols import tap
from gavo.protocols import uws


# set to true by the signal handler
EXIT_PLEASE = False


# The pid of the worker db backend.  This is used in the signal handler
# when it tries to kill the running query.
_WORKER_PID = None


def normalizeTAPFormat(rawFmt):
	format = rawFmt.lower()
	try:
		return tap.FORMAT_CODES[format][0]
	except KeyError:
		raise base.ValidationError(
			"Unsupported format '%s'."%format, colName="RESPONSEFORMAT",
			hint="Legal format codes include %s"%(", ".join(tap.FORMAT_CODES)))


def _assertSupportedLanguage(jobId, langSpec):
	"""raises a UWSError if langSpec ("ADQL-3.1") cannot be processed by
	this service.
	"""
	if "-" in langSpec:
		name, version = langSpec.split("-", 1)
	else:
		name, version = langSpec, None

	if name not in tap.SUPPORTED_LANGUAGES:
		raise uws.UWSError("This service does not support"
			" the query language %s"%name, jobId)

	if version is not None:
		if version not in tap.SUPPORTED_LANGUAGES[name]["versions"]:
			raise uws.UWSError("This service does not support"
				" version %s of the query language %s (but some other version;"
				" see capabilities)."%(version, name), jobId)


def _parseTAPParameters(jobId, parameters):
	"""gets and checks TAP parameters like version, request, and such.

	The function returns a tuple of query and maxrec.
	"""
	try:
		if "request" in parameters and parameters["request"]!="doQuery":
			raise uws.UWSError("This service only supports REQUEST=doQuery", jobId)
		_assertSupportedLanguage(jobId, parameters["lang"])
		query = parameters["query"]
	except KeyError as key:
		raise base.ui.logOldExc(base.ValidationError(
			"Required parameter %s missing."%key, key))

	try:
		maxrec = min(base.getConfig("async", "hardMAXREC"),
			int(parameters["maxrec"]))
	except ValueError:
		raise base.ui.logOldError(
			uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"]))
	except KeyError:
		maxrec = base.getConfig("async", "defaultMAXREC")
	return query, maxrec


def _makeDataFor(resultTable):
	"""returns an rsc.Data instance containing resultTable and some
	additional metadata.
	"""
	resData = rsc.wrapTable(resultTable)
	resData.addMeta("info", "Query successful",
		infoName="QUERY_STATUS", infoValue="OK")
	resData.addMeta("_type", "results")
	resData.overflowLimit = resultTable.tableDef.overflowLimit
	return resData


def writeResultTo(format, res, outF):
	# special-case votable formats to handle overflow conditions and such
	if format.startswith("votable"):
		# the following duplicates a mapping from votablewrite; that's
		# bad, and I should figure out a way to teach formats.format
		# whatever is needed to let it do what we're doing here.  Meanwhile:
		enc = {
			"votable": "binary",
			"votableb2": "binary2",
			"votabletd": "td",
		}.get(format, "td")

		oe = votable.OverflowElement(
			res.getPrimaryTable().tableDef.overflowLimit,
			votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW"))
		ctx = votablewrite.VOTableContext(
			tablecoding=enc,
			acquireSamples=False, 
			overflowElement=oe)
		votablewrite.writeAsVOTable(res, outF, ctx)
	else:
		formats.formatData(format, res, outF, acquireSamples=False)


def _ingestUploads(uploads, connection):
	tds = []
	for destName, src in uploads:
		if isinstance(src, tap.LocalFile):
			srcF = open(src.fullPath, "rb")
		else:
			try:
				srcF = utils.urlopenRemote(src)
			except IOError as ex:
				raise base.ui.logOldExc(
					base.ValidationError("Upload '%s' cannot be retrieved"%(
					src), "UPLOAD", hint="The I/O operation failed with the message: "+
					str(ex)))
		if valuemappers.needsQuoting(destName):
			raise base.ValidationError("'%s' is not a valid table name on"
				" this site"%destName, "UPLOAD", hint="It either contains"
				" non-alphanumeric characters or conflicts with an ADQL"
				" reserved word.  Quoted table names are not supported"
				" at this site.")
		uploadedTable = votableread.uploadVOTable(destName, srcF, connection,
				nameMaker=votableread.AutoQuotedNameMaker())
		if uploadedTable is not None:
			tds.append(uploadedTable.tableDef)
		srcF.close()
	return tds


def _noteWorkerPID(conn):
	"""stores conn's worker PID in _WORKER_PID.
	"""
	global _WORKER_PID
	curs = conn.cursor()
	curs.execute("SELECT pg_backend_pid()")
	_WORKER_PID = curs.fetchall()[0][0]
	curs.close()


def _hangIfMagic(jobId, parameters, timeout):
# Test intrumentation. There are more effective ways to DoS me.
	if parameters.get("query")=="JUST HANG around":
		time.sleep(timeout)
		with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
			job.change(phase=uws.COMPLETED,
				endTime=datetime.datetime.utcnow())
		sys.exit()


def getQTableFromJob(parameters, jobId, queryProfile, timeout):
	"""returns a QueryTable for a TAP job.
	"""
	query, maxrec = _parseTAPParameters(jobId, parameters)
	connectionForQuery = base.getDBConnection(queryProfile)

	try:
		_noteWorkerPID(connectionForQuery)
	except: # Don't fail just because we can't kill workers
		base.ui.notifyError(
			"Could not obtain PID for the worker, job %s"%jobId)
	tdsForUploads = _ingestUploads(parameters.get("upload", ""), 
		connectionForQuery)

	return adqlglue.runTAPQuery(query, timeout, connectionForQuery,
		tdsForUploads, maxrec)


def runTAPJobNoState(parameters, jobId, queryProfile, timeout):
	"""executes a TAP job defined by parameters and writes the
	result to the job's working directory.
	
	This does not do state management.  Use runTAPJob if you need it.
	"""
	_hangIfMagic(jobId, parameters, timeout)
	# The following makes us bail out if a bad format was passed -- no
	# sense spending the CPU on executing the query then, so we get the
	# format here.
	defaultFormat = "votable"
	if base.getConfig("ivoa", "votDefaultEncoding")=="td":
		defaultFormat = "votable/td"

	format = normalizeTAPFormat(parameters.get("responseformat", defaultFormat))

	res = _makeDataFor(getQTableFromJob(
		parameters, jobId, queryProfile, timeout))

	try:
		job = tap.WORKER_SYSTEM.getJob(jobId)
		destF = job.openResult(
			formats.getMIMEFor(
				format, 
				job.parameters.get("responseformat")), 
				"result")
		writeResultTo(format, res, destF)
		destF.close()
	except Exception:
		# DB errors can occur here since we're streaming directly from
		# the database.
		svcs.mapDBErrors(*sys.exc_info())
	# connectionForQuery closed by QueryTable


def runTAPJob(jobId, queryProfile="untrustedquery"):
	"""executes a TAP job defined by parameters and job id.

	This assumes the job has already been put into executing, and the
	apprpriate pid has been entered.  To indicate that actual processing
	has started and the job is killable, the start time is recorded, though.
	"""
	with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
		job.change(startTime=datetime.datetime.utcnow())
		timeout = job.executionDuration
		parameters = job.parameters
	try:
		runTAPJobNoState(parameters, jobId, queryProfile, timeout)
	except Exception as ex:
		if not isinstance(ex, base.Error):
			base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex))
		tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
	else:
		tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)


def runSyncTAPJob(jobId, queryMeta=None):
	"""executes a TAP job "synchronously".

	When this is done, the job will be in an end state, i.e., ERROR, 
	COMPLETED or perhaps ABORTED.

	You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself.

	Essentially, this puts the job into EXECUTING and declares the
	pid as -1. The UWS machinery will happily kill a job with a pid
	when asked to abort such a job, and since sync jobs run with the
	server's pid, that's really not what we want (conversely: sync
	jobs can't really be aborted).	Anyway: Do *not* put anything
	getpid returns into a sync job's pid field.
	"""
	if queryMeta:
		requestedTimeout = queryMeta["timeout"]
	else:
		requestedTimeout = 100000
	
	execTime = min(
		base.getConfig("async", "defaultExecTimeSync"), 
		requestedTimeout)

	with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
		job.change(
			executionDuration=execTime,
			phase=uws.EXECUTING,
			pid=-1)
	runTAPJob(jobId)


############### CLI


def setINTHandler(jobId):
	"""installs a signal handler that pushes our job to aborted on SIGINT.
	"""
	import signal

	def handler(signo, frame):
		global EXIT_PLEASE
		EXIT_PLEASE = True

	signal.signal(signal.SIGINT, handler)


def _killWorker(jobId):
	"""tries to kill the postgres worker for this job.
	"""
	with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob:
		wjob.change(phase=uws.ABORTED)

	if _WORKER_PID:
		base.ui.notifyInfo("Trying to abort %s, wpid %s"%(
			jobId, _WORKER_PID))
		with base.getUntrustedConn() as conn:
			curs = conn.cursor()
			curs.execute("SELECT pg_cancel_backend(%d)"%_WORKER_PID)
			curs.close()


def joinInterruptibly(t, jobId):
	while True: 
		t.join(timeout=0.5)
		if not t.is_alive():
			return
		if EXIT_PLEASE:
			_killWorker(jobId)
			sys.exit(2)



def _runInThread(target, jobId):
	# The standalone tap runner must run the query in a thread since
	# it must be able to react to a SIGINT.
	import threading
	t = threading.Thread(target=target)
	t.setDaemon(True)
	t.start()
	try:
		joinInterruptibly(t, jobId)
	except (SystemExit, Exception):
		# give us the thread a chance to quit cleanly
		t.join(1)
		raise


def parseCommandLine():
	from optparse import OptionParser
	parser = OptionParser(usage="%prog <jobid>",
		description="runs the TAP job with <jobid> from the UWS table.")
	opts, args = parser.parse_args()
	if len(args)!=1:
		parser.print_help(file=sys.stderr)
		sys.exit(1)
	return opts, args[0]


def main():
	"""causes the execution of the job with jobId sys.argv[0].
	"""
	# there's a problem in CLI behaviour in that if anything goes wrong in 
	# main, a job that may have been created will remain QUEUED forever.
	# There's little we can do about that, though, since we cannot put
	# a job into ERROR when we don't know its id or cannot get it from the DB.
	try:
		base.DEBUG = False
		opts, jobId = parseCommandLine()
		setINTHandler(jobId)
		try:
			_runInThread(lambda: runTAPJob(jobId), jobId)
			base.ui.notifyInfo("taprunner for %s finished"%jobId)
		except SystemExit:
			pass
		except uws.JobNotFound: # someone destroyed the job before I was done
			errmsg = "Giving up non-existing TAP job %s."%jobId
			sys.stderr.write(errmsg+"\n")
			base.ui.notifyInfo(errmsg)
		except Exception as ex:
			base.ui.notifyError("taprunner %s major failure"%jobId)
			# try to push job into the error state -- this may well fail given
			# that we're quite hosed, but it's worth the try
			tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
			raise
	finally:
		pass