1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
|
"""
Execution of UWS (right now, TAP only) requests.
This mainly intended to be exec'd (through some wrapper) by the queue
runner in the main server thread. The jobs executed have to be in
the database and have to have a job directory.
Primarily for testing an alternative interface rabRun exists that takes that
takes jobid, and parameters.
The tap runner takes the job to EXECUTING shortly before sending the
query to the DB server. When done, the job's state is one of COMPLETED,
ABORTED or ERROR.
"""
#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import datetime
import sys
import time
from gavo import base
from gavo import formats
from gavo import rsc
from gavo import rscdesc #noflake: cache registration
from gavo import svcs
from gavo import utils
from gavo import votable
from gavo.base import valuemappers
from gavo.formats import texttable #noflake: format registration
from gavo.formats import csvtable #noflake: format registration
from gavo.formats import jsontable #noflake: format registration
from gavo.formats import geojson #noflake: format registration
from gavo.formats import votableread
from gavo.formats import votablewrite
from gavo.protocols import adqlglue
from gavo.protocols import simbadinterface #noflake: UDF registration
from gavo.protocols import tap
from gavo.protocols import uws
# set to true by the signal handler
EXIT_PLEASE = False
# The pid of the worker db backend. This is used in the signal handler
# when it tries to kill the running query.
_WORKER_PID = None
def normalizeTAPFormat(rawFmt):
format = rawFmt.lower()
try:
return tap.FORMAT_CODES[format][0]
except KeyError:
raise base.ValidationError(
"Unsupported format '%s'."%format, colName="RESPONSEFORMAT",
hint="Legal format codes include %s"%(", ".join(tap.FORMAT_CODES)))
def _assertSupportedLanguage(jobId, langSpec):
"""raises a UWSError if langSpec ("ADQL-3.1") cannot be processed by
this service.
"""
if "-" in langSpec:
name, version = langSpec.split("-", 1)
else:
name, version = langSpec, None
if name not in tap.SUPPORTED_LANGUAGES:
raise uws.UWSError("This service does not support"
" the query language %s"%name, jobId)
if version is not None:
if version not in tap.SUPPORTED_LANGUAGES[name]["versions"]:
raise uws.UWSError("This service does not support"
" version %s of the query language %s (but some other version;"
" see capabilities)."%(version, name), jobId)
def _parseTAPParameters(jobId, parameters):
"""gets and checks TAP parameters like version, request, and such.
The function returns a tuple of query and maxrec.
"""
try:
if "request" in parameters and parameters["request"]!="doQuery":
raise uws.UWSError("This service only supports REQUEST=doQuery", jobId)
_assertSupportedLanguage(jobId, parameters["lang"])
query = parameters["query"]
except KeyError as key:
raise base.ui.logOldExc(base.ValidationError(
"Required parameter %s missing."%key, key))
try:
maxrec = min(base.getConfig("async", "hardMAXREC"),
int(parameters["maxrec"]))
except ValueError:
raise base.ui.logOldError(
uws.UWSError("Invalid MAXREC literal '%s'."%parameters["maxrec"]))
except KeyError:
maxrec = base.getConfig("async", "defaultMAXREC")
return query, maxrec
def _makeDataFor(resultTable):
"""returns an rsc.Data instance containing resultTable and some
additional metadata.
"""
resData = rsc.wrapTable(resultTable)
resData.addMeta("info", "Query successful",
infoName="QUERY_STATUS", infoValue="OK")
resData.addMeta("_type", "results")
resData.overflowLimit = resultTable.tableDef.overflowLimit
return resData
def writeResultTo(format, res, outF):
# special-case votable formats to handle overflow conditions and such
if format.startswith("votable"):
# the following duplicates a mapping from votablewrite; that's
# bad, and I should figure out a way to teach formats.format
# whatever is needed to let it do what we're doing here. Meanwhile:
enc = {
"votable": "binary",
"votableb2": "binary2",
"votabletd": "td",
}.get(format, "td")
oe = votable.OverflowElement(
res.getPrimaryTable().tableDef.overflowLimit,
votable.V.INFO(name="QUERY_STATUS", value="OVERFLOW"))
ctx = votablewrite.VOTableContext(
tablecoding=enc,
acquireSamples=False,
overflowElement=oe)
votablewrite.writeAsVOTable(res, outF, ctx)
else:
formats.formatData(format, res, outF, acquireSamples=False)
def _ingestUploads(uploads, connection):
tds = []
for destName, src in uploads:
if isinstance(src, tap.LocalFile):
srcF = open(src.fullPath, "rb")
else:
try:
srcF = utils.urlopenRemote(src)
except IOError as ex:
raise base.ui.logOldExc(
base.ValidationError("Upload '%s' cannot be retrieved"%(
src), "UPLOAD", hint="The I/O operation failed with the message: "+
str(ex)))
if valuemappers.needsQuoting(destName):
raise base.ValidationError("'%s' is not a valid table name on"
" this site"%destName, "UPLOAD", hint="It either contains"
" non-alphanumeric characters or conflicts with an ADQL"
" reserved word. Quoted table names are not supported"
" at this site.")
uploadedTable = votableread.uploadVOTable(destName, srcF, connection,
nameMaker=votableread.AutoQuotedNameMaker())
if uploadedTable is not None:
tds.append(uploadedTable.tableDef)
srcF.close()
return tds
def _noteWorkerPID(conn):
"""stores conn's worker PID in _WORKER_PID.
"""
global _WORKER_PID
curs = conn.cursor()
curs.execute("SELECT pg_backend_pid()")
_WORKER_PID = curs.fetchall()[0][0]
curs.close()
def _hangIfMagic(jobId, parameters, timeout):
# Test intrumentation. There are more effective ways to DoS me.
if parameters.get("query")=="JUST HANG around":
time.sleep(timeout)
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(phase=uws.COMPLETED,
endTime=datetime.datetime.utcnow())
sys.exit()
def getQTableFromJob(parameters, jobId, queryProfile, timeout):
"""returns a QueryTable for a TAP job.
"""
query, maxrec = _parseTAPParameters(jobId, parameters)
connectionForQuery = base.getDBConnection(queryProfile)
try:
_noteWorkerPID(connectionForQuery)
except: # Don't fail just because we can't kill workers
base.ui.notifyError(
"Could not obtain PID for the worker, job %s"%jobId)
tdsForUploads = _ingestUploads(parameters.get("upload", ""),
connectionForQuery)
return adqlglue.runTAPQuery(query, timeout, connectionForQuery,
tdsForUploads, maxrec)
def runTAPJobNoState(parameters, jobId, queryProfile, timeout):
"""executes a TAP job defined by parameters and writes the
result to the job's working directory.
This does not do state management. Use runTAPJob if you need it.
"""
_hangIfMagic(jobId, parameters, timeout)
# The following makes us bail out if a bad format was passed -- no
# sense spending the CPU on executing the query then, so we get the
# format here.
defaultFormat = "votable"
if base.getConfig("ivoa", "votDefaultEncoding")=="td":
defaultFormat = "votable/td"
format = normalizeTAPFormat(parameters.get("responseformat", defaultFormat))
res = _makeDataFor(getQTableFromJob(
parameters, jobId, queryProfile, timeout))
try:
job = tap.WORKER_SYSTEM.getJob(jobId)
destF = job.openResult(
formats.getMIMEFor(
format,
job.parameters.get("responseformat")),
"result")
writeResultTo(format, res, destF)
destF.close()
except Exception:
# DB errors can occur here since we're streaming directly from
# the database.
svcs.mapDBErrors(*sys.exc_info())
# connectionForQuery closed by QueryTable
def runTAPJob(jobId, queryProfile="untrustedquery"):
"""executes a TAP job defined by parameters and job id.
This assumes the job has already been put into executing, and the
apprpriate pid has been entered. To indicate that actual processing
has started and the job is killable, the start time is recorded, though.
"""
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(startTime=datetime.datetime.utcnow())
timeout = job.executionDuration
parameters = job.parameters
try:
runTAPJobNoState(parameters, jobId, queryProfile, timeout)
except Exception as ex:
if not isinstance(ex, base.Error):
base.ui.notifyError("While executing TAP job %s: %s"%(jobId, ex))
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
else:
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.COMPLETED, None)
def runSyncTAPJob(jobId, queryMeta=None):
"""executes a TAP job "synchronously".
When this is done, the job will be in an end state, i.e., ERROR,
COMPLETED or perhaps ABORTED.
You must call tap.WORKER_SYSTEM.destroy(jobId) when done yourself.
Essentially, this puts the job into EXECUTING and declares the
pid as -1. The UWS machinery will happily kill a job with a pid
when asked to abort such a job, and since sync jobs run with the
server's pid, that's really not what we want (conversely: sync
jobs can't really be aborted). Anyway: Do *not* put anything
getpid returns into a sync job's pid field.
"""
if queryMeta:
requestedTimeout = queryMeta["timeout"]
else:
requestedTimeout = 100000
execTime = min(
base.getConfig("async", "defaultExecTimeSync"),
requestedTimeout)
with tap.WORKER_SYSTEM.changeableJob(jobId) as job:
job.change(
executionDuration=execTime,
phase=uws.EXECUTING,
pid=-1)
runTAPJob(jobId)
############### CLI
def setINTHandler(jobId):
"""installs a signal handler that pushes our job to aborted on SIGINT.
"""
import signal
def handler(signo, frame):
global EXIT_PLEASE
EXIT_PLEASE = True
signal.signal(signal.SIGINT, handler)
def _killWorker(jobId):
"""tries to kill the postgres worker for this job.
"""
with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob:
wjob.change(phase=uws.ABORTED)
if _WORKER_PID:
base.ui.notifyInfo("Trying to abort %s, wpid %s"%(
jobId, _WORKER_PID))
with base.getUntrustedConn() as conn:
curs = conn.cursor()
curs.execute("SELECT pg_cancel_backend(%d)"%_WORKER_PID)
curs.close()
def joinInterruptibly(t, jobId):
while True:
t.join(timeout=0.5)
if not t.is_alive():
return
if EXIT_PLEASE:
_killWorker(jobId)
sys.exit(2)
def _runInThread(target, jobId):
# The standalone tap runner must run the query in a thread since
# it must be able to react to a SIGINT.
import threading
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
try:
joinInterruptibly(t, jobId)
except (SystemExit, Exception):
# give us the thread a chance to quit cleanly
t.join(1)
raise
def parseCommandLine():
from optparse import OptionParser
parser = OptionParser(usage="%prog <jobid>",
description="runs the TAP job with <jobid> from the UWS table.")
opts, args = parser.parse_args()
if len(args)!=1:
parser.print_help(file=sys.stderr)
sys.exit(1)
return opts, args[0]
def main():
"""causes the execution of the job with jobId sys.argv[0].
"""
# there's a problem in CLI behaviour in that if anything goes wrong in
# main, a job that may have been created will remain QUEUED forever.
# There's little we can do about that, though, since we cannot put
# a job into ERROR when we don't know its id or cannot get it from the DB.
try:
base.DEBUG = False
opts, jobId = parseCommandLine()
setINTHandler(jobId)
try:
_runInThread(lambda: runTAPJob(jobId), jobId)
base.ui.notifyInfo("taprunner for %s finished"%jobId)
except SystemExit:
pass
except uws.JobNotFound: # someone destroyed the job before I was done
errmsg = "Giving up non-existing TAP job %s."%jobId
sys.stderr.write(errmsg+"\n")
base.ui.notifyInfo(errmsg)
except Exception as ex:
base.ui.notifyError("taprunner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
tap.WORKER_SYSTEM.changeToPhase(jobId, uws.ERROR, ex)
raise
finally:
pass
|