1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
|
"""
A UWS-based interface to datalink.
TODO: There's quite a bit of parallel between this and useruws. This
should probably be reformulated along the lines of useruws.
"""
#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import pickle as pickle
import datetime
from twisted.web import server
from gavo import base
from gavo import svcs
from gavo import utils
from gavo import rscdesc #noflake: cache registration
from gavo.protocols import products
from gavo.protocols import uws
from gavo.protocols import uwsactions
# TODO: We're not supposed to import from helpers in main code. Fix
# this.
from gavo.helpers import testtricks
class DLTransitions(uws.ProcessBasedUWSTransitions):
"""The transition function for datalink jobs.
"""
def __init__(self):
uws.ProcessBasedUWSTransitions.__init__(self, "DL")
def queueJob(self, newState, wjob, ignored):
uws.ProcessBasedUWSTransitions.queueJob(self, newState, wjob, ignored)
return self.startJob(uws.EXECUTING, wjob, ignored)
def getCommandLine(self, wjob):
return "gavo", ["gavo", "dlrun", "--", str(wjob.jobId)]
class ServiceIdParameter(uws.JobParameter):
"""A fully qualified id of the DaCHS service to execute the datalink
request.
"""
class ArgsParameter(uws.JobParameter):
"""all parameters passed to the datalink job as a request.args dict.
The serialised representation is the pickled dict. Pickle is ok as
the string never leaves our control (the network serialisation is
whatever comes in via the POST).
"""
@staticmethod
def _deserialize(pickled):
return pickle.loads(pickled)
@staticmethod
def _serialize(args):
return pickle.dumps(args)
class DLJob(uws.UWSJobWithWD):
"""a UWS job performing some datalink data preparation.
In addition to UWS parameters, it has
* serviceid -- the fully qualified id of the service that will process
the request
* datalinkargs -- the parameters (in request.args form) of the
datalink request.
"""
_jobsTDId = "//datalink#datalinkjobs"
_transitions = DLTransitions()
_parameter_serviceid = ServiceIdParameter
_parameter_datalinkargs = ArgsParameter
def setParamsFromDict(self, args):
"""stores datalinkargs from args.
As there's only one common UWS for all dlasync services, we have
to steal the service object from upstack at the moment. Let's see
if there's a way around that later.
"""
self.setPar("datalinkargs", args)
self.setPar("serviceid", utils.stealVar("service").getFullId())
class DLUWS(uws.UWS):
"""the worker system for datalink jobs.
"""
joblistPreamble = ("<?xml-stylesheet href='/static"
"/xsl/dlasync-joblist-to-html.xsl' type='text/xsl'?>")
jobdocPreamble = ("<?xml-stylesheet href='/static/xsl/"
"dlasync-job-to-html.xsl' type='text/xsl'?>")
_baseURLCache = None
def __init__(self):
uws.UWS.__init__(self, DLJob, uwsactions.JobActions())
@property
def baseURL(self):
return base.makeAbsoluteURL("datalinkuws")
def getURLForId(self, jobId):
"""returns a fully qualified URL for the job with jobId.
"""
return "%s/%s"%(self.baseURL, jobId)
DL_WORKER = DLUWS()
####################### twisted.web simulation
# This is so we can handle t.w resources coming back from datalink.
# Factor this out? This is essentially stolen from trialhelpers,
# and we might just put that somewhere where it's useful.
import warnings
from twisted.internet import defer
from twisted.internet import reactor
def _requestDone(result, request):
"""essentially calls render on result and stops the reactor.
This is a helper for our t.w simulation.
"""
if isinstance(result, str):
if result:
request.write(result)
else:
warnings.warn("Unsupported async datalink render result: %s"%repr(result))
request.deferred.callback(request.accumulator)
reactor.stop()
return request.accumulator, request
class WritingFakeRequest(testtricks.FakeRequest):
"""a simulator for actual t.w requests.
We want this here as we're rendering to a UWS result file
with the same code that renders to web requests.
One could probably go a lot simpler than testtricks.FakeRequest,
but since the code is there anyway, it probably doesn't hurt
to use it in case we want to do fancier things in the future.
The one thing I have to change vs. testtricks is that we want
to write to files.
"""
def __init__(self, destFile):
self.destFile = destFile
testtricks.FakeRequest.__init__(self, "")
def write(self, stuff):
self.destFile.write(stuff)
def finish(self):
self.deferred.callback(None)
self.destFile.close()
reactor.stop()
def writeResultTo(page, destFile):
"""arranges for the result of rendering the twisted.web resource
to be written to destFile.
This uses a very simple simulation of t.w rendering, so a few
tricks are possible. Also, it actually runs a reactor to do its magic.
Do not run this in a running DaCHS server; it has its own reactor
running. This is only for dachs dlrun code.
TODO: There's proabably code for this in t.w.
"""
def _(func, req):
try:
res = func(req)
except Exception:
request.finish()
raise
if res==server.NOT_DONE_YET:
# resource will finish the request itself later
return
try:
if res:
req.write(res)
finally:
req.finish()
request = WritingFakeRequest(destFile)
reactor.callWhenRunning(_, page.render, request)
reactor.run()
return request
####################### CLI
def parseCommandLine():
import argparse
parser = argparse.ArgumentParser(description="Run an asynchronous datalink"
" job (used internally)")
parser.add_argument("jobId", type=str, help="UWS id of the job to run")
return parser.parse_args()
def main():
args = parseCommandLine()
jobId = args.jobId
try:
job = DL_WORKER.getJob(jobId)
with job.getWritable() as wjob:
wjob.change(phase=uws.EXECUTING, startTime=datetime.datetime.utcnow())
service = base.resolveCrossId(job.parameters["serviceid"])
args = job.parameters["datalinkargs"]
data = service.run("dlget", args, svcs.emptyQueryMeta)
# Unfortunately, datalink cores can in principle return all kinds
# of messy things that may not even be representable in plain files
# (e.g., t.w resources returning redirects). We hence only
# handle (mime, payload) and (certain) Product instances here
# and error out otherwise.
if isinstance(data, tuple):
mime, payload = data
with job.openResult(mime, "result") as destF:
destF.write(payload)
elif isinstance(data, products.ProductBase):
# We could run render and grab the content-type from there
# (which probably would be better all around). For now, don't
# care:
with job.openResult("application/octet-stream", "result") as destF:
for chunk in data.iterData():
destF.write(chunk)
elif hasattr(data, "render"):
# these are t.w. resources. Let's run a reactor so these properly
# work.
with job.openResult(type, "result") as destF:
req = writeResultTo(data, destF)
job.fixTypeForResultName("result",
req.responseHeaders.getRawHeaders("content-type")[0])
else:
raise NotImplementedError("Cannot handle a service %s result yet."%
repr(data))
with job.getWritable() as wjob:
wjob.change(phase=uws.COMPLETED)
except SystemExit:
pass
except uws.JobNotFound:
base.ui.notifyInfo("Giving up non-existing datalink job %s."%jobId)
except Exception as ex:
base.ui.notifyError("Datalink runner %s major failure"%jobId)
# try to push job into the error state -- this may well fail given
# that we're quite hosed, but it's worth the try
DL_WORKER.changeToPhase(jobId, uws.ERROR, ex)
raise
if __name__=="__main__": # pagma: no cover
# silly test code, not normally reached
from twisted.web import resource
import os
class _Foo(resource.Resource):
def __init__(self, stuff):
self.stuff = stuff
def render(self, request):
if self.stuff=="booga":
return b"abc"
else:
return defer.maybeDeferred(_Foo, "booga").addBoth(self.cleanup)
def cleanup(self, res):
print("cleaning up")
return res
with open("bla", "w") as f:
writeResultTo(_Foo("ork"), f)
with open("bla") as f:
print(f.read())
os.unlink("bla")
|