"""
Twisted trial-based tests for TAP and UWS.

Tests not requiring the TAP renderer go to taptest.py

The real big integration tap tests use the HTTP interface and are in
an external resource package taptest.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import atexit
import datetime
import io
import re
import os
import time

from twisted.internet import reactor
from twisted.python import threadable
threadable.init()

from gavo.helpers import trialhelpers

from gavo import base
from gavo import rscdesc #noflake: for registration
from gavo import utils
from gavo.protocols import scs  #noflake: for table's q3c mixin
from gavo.protocols import tap
from gavo.protocols import uws

base.DEBUG = True

_B = "__system__/tap/run/"


votWithOldGeom = b'eJxdUctugzAQvOcrVj72YAfUEwIkClRCigIiNNdqC26DRDCxHUj69bVDUKNcPLPa2dmH/X1eRW+b\nFEYuVSv6gDjUJSu4HLteBeSg9eAxNk0TbUeBtOeamRQzVfjVcTZadbjyy3SXf5RxaujNzuB7lm4S\nyJKAKIWfnahRG38CKCVeVfvLA/JCoEGN+jqYoD6gJNDjkT9XnOsmIINQlJ9M0Lc6IA3/MXQuVLr2\nIqWliIWQjaKFUK0tdBO6x+7MXQKXWYnNqfOKPNtWduYk3cVlVlRZvg2zuNzB0hHEN+DApT5LDjXv\nNZc+e1SvfHbbzppEVbQsPXMAvyotWJKEyzRwa+HS9doB59WCz0zaypnVm/ffw7S743JN9nBhdv+z\n8A+9NIXB\n'
votWithDALIGeom = b'eJxdkcFugzAMhu88hZUHSFq2UwVIDJiEVI0KWK9TBpmGRBOWBGj39HNK0ape8tvJ599xEhyLOn7Z\nZzAJbTolQ7KlT8SD86mXJiTf1g47xuZ5pt2kOJXCMjxiWMU/e8EmR0deUGZV8V4mGYZXO9TXPNun\nkKchMYZ/9KrhFv0JcK35xXS/IiQ+gZZbbi8DJq0a0ZGA5CfxWDM2bUgGZaj4wWThjW12sbFaJUrp\n1tCDMp2j/ZQeeT8KND8v5KA6ad0t06xKyvxQ58VblCdlBWsHUF/AB6HtqAU0QlqhA3ZPewG7zuNM\n4jpex1xigKAunbggjXy62Wxh++wkYLjhAOYIXP+rsMFN1xdjd6/Ibv8Sed4fDoV9KQ==\n'


class TAPRenderTest(trialhelpers.ArchiveTest):
	pass


class SyncMetaTest(TAPRenderTest):
	"""tests for non-doQuery sync queries.
	"""
	def testVersionRejected(self):
		"""requests with bad version are rejected.
		"""
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "queryData", "VERSION": "2.1", "LAnG": "ADQL",
				"QUeRY": "ignored"},
			['<INFO name="QUERY_STATUS" value="ERROR">', 'Version mismatch'])

	def testNoSyncPaths(self):
		"""segments below sync are 404.
		"""
		return trialhelpers.runQuery(self.renderer, "GET", f"{_B}sync/foo", {}
			).addCallback(lambda res: ddt  #noflake: test instrumentation
			).addErrback(lambda res: None)

	def testCapabilities(self):
		"""simple get capabilities response looks ok.
		"""
		return self.assertGETHasStrings(f"{_B}sync",
			{"REQUEST": "getCapabilities"}, [
				'<capability standardID="ivo://ivoa.net/std/TAP',
				'ParamHTTP">'])

	def testTAPRootIsInfo(self):
		return self.assertGETHasStrings(f"{_B}", {}, [
			">ivo://x-testing/tap<",
			"The Unittest Suite's TAP end point."])

	def testAvailability(self):
		return self.assertGETHasStrings(f"{_B}availability", {}, [
			"<avl:available>true</avl:available>"])

	def testCapabilitiesPath(self):
		return self.assertGETHasStrings(f"{_B}capabilities", {}, [
			'<capability standardID="ivo://ivoa.net/std/TAP"'])
	
	def testTables(self):
		return self.assertGETHasStrings(f"{_B}tables", {}, [
			"<description> Unittest Suite's Table Access Protocol (TAP) service"])

	def testExamples(self):
		return self.assertGETHasStrings(f"{_B}examples", {}, [
			'<body vocab="http://www.ivoa.net/rdf/examples#"',
			'<div id="exampleslist"></div>'])


class SyncQueryTest(TAPRenderTest):
	"""tests for querying sync queries.
	"""
	aVOTable = os.path.join(base.getConfig("inputsdir"),
		"data/vizier_votable.vot")

	def testNoLangRejected(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<3'
			}, [
				"<INFO", "Required parameter 'lang' missing.</INFO>"])

	def testBadLangRejected(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"LANG": "Furz",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<3'
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">This service does'
				' not support the query language Furz'])

	def testSimpleQuery(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"FORMAT": "text/xml",
				"QUERY": 'SELECT alpha, NULL as n FROM test.adql WHERE alpha<200'
			}, [
				'<RESOURCE type="results"',
				'ucd="pos.eq.ra;meta.main"',
				'ID="alpha"',
				'unit="deg"',
				'name="alpha"',
				'<FIELD ID="n" datatype="float" name="n"/>',
				'<TD>2.0</TD>',
				'<TD>NaN</TD>'])

	def testNameError(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT doesnotexist FROM test.adql'
			}, [
				'INFO name="QUERY_STATUS" value="ERROR">Field query:'
				" No such field known: column 'doesnotexist' could not"
				' be located in table metadata'
				])

	def testMultipleRequest(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": ["doQuery", "doQuery"],
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<2'
			}, [
				'<RESOURCE type="results"',
				'ucd="pos.eq.ra;meta.main"',
				'ID="alpha"',
				'unit="deg"',
				'name="alpha"'])

	def testOverflowAndCast(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"LANG": "ADQL",
				"MAXREC": "1",
				"QUERY": 'SELECT cast(alpha as national char(3)) as foo FROM test.adql',
				"FORMAT": "votable/td",
			}, [
				'<INFO name="QUERY_STATUS" value="OVERFLOW"',
				'datatype="unicodeChar"', 'arraysize="*"',
				'sample RA -- *TAINTED*',
				'<TD>2  </TD>'])

	def testNoOverflowWithTOP(self):
		return self.assertGETLacksStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"MAXREC": "1",
				"QUERY": 'SELECT TOP 1 alpha FROM test.adql'
			}, [
				'value="OVERFLOW"', ])

	def testMAXREC0(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"MAXREC": "0",
				"QUERY": 'SELECT alpha FROM test.adql'
			}, [
				'Query successful',
				'<COOSYS',
				'datatype="float"'])

	def testBadFormat(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<2',
				"RESPONSEFORMAT": 'xls'
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">Field responseformat:'
				" 'xls' is not a valid value for responseformat</INFO>"])

	def testClearVOT(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT CIRCLE(\'ICRS\', alpha, delta+1e-10, 10)'
					' FROM test.adql WHERE alpha<3',
				"FORMAT": "votable/td"
			}, [
				'<TABLEDATA><TR><TD>2.0', '14.0', '10.0',
				'xtype="circle"'])

	def testCSV(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha,delta FROM test.adql WHERE alpha<3',
				"FORMAT": "text/csv"
			}, [
				'2.0,14.0'])

	def testTSV(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"FORMAT": "TSV"
			}, [
				'2.0\t14.0'])

	def testJSON(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": "SELECT TOP 3 column_name, datatype FROM TAP_SCHEMA.columns"
					" WHERE table_name='tap_schema.tables' ORDER BY column_name",
				"RESPONSEFORMAT": "application/json",
				"MAXREC": "1"
			}, [
				'"contains": "table"',
				'"description": "ADQL datatype"',
				# DaCHS buglet: maxrec=1 returns two rows.
				'["description", "unicodeChar"], ["nrows", "long"]',
				'"queryStatus": "OVERFLOW"'])

	def testBin2Table(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"FORMAT": "application/x-votable+xml;serialization=BINARY2"
			}, [
				'BINARY2', 'AEAAAABBYAAA'])

	def testFITSTable(self):
		return self.assertGETHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"RESPONSEFORMAT": "fits",
				"MAXREC": "0",
			}, [
				'SIMPLE  =',
				"TYPE1  = 'alpha   '",
				"TCOMM1  = 'A sample RA'"])

	def testBadUploadSyntax(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM test.adql'
			}, [
				"only allow regular SQL identifiers"])

	def testBadUploadSyntax2(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "bar,http://x.y;",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			"only allow regular SQL identifiers"
			])

	def testNonExistingUpload(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,http://127.0.0.1:65000",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM test.adql'
			}, [
				"'http://127.0.0.1:65000' cannot be retrieved</INFO",
				"Connection refused"])

	def testUploadCannotReadLocalFile(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "bar,file:///etc/passwd",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			"'file:///etc/passwd' cannot be retrieved</INFO",
			"unknown url type"
			]).addErrback(lambda failure: None)

	def testMalformedUploadURL(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "http://fit://file://x.ab",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			'<INFO name="QUERY_STATUS" value="ERROR">Field UPLOAD:'
			' Syntax error in UPLOAD parameter'
			])

	def testInlineUploadFromArgsWorks(self):
		with open(self.aVOTable, "rb") as f:
			uploadFile = trialhelpers.FakeFile("../Hägar/klause", f.read())

		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,param:HoNk",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM tap_upload.bar join test.adql on'
					' (alpha-"_RAJ2000"<0)',
				"HoNk": uploadFile,
			}, [
				'xmlns="http://www.ivoa.net/xml/VOTable/',
				'ucd="pos.eq.ra;meta.main"',
				'encoding="base64"'])

	def testSensibleErrorOnMalformedUpload(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,param:HoNk",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM tap_upload.bar join test.adql on'
					' (alpha-"_RAJ2000"<0)',
				"HoNk": trialhelpers.FakeFile("../Hägar/klause",
					b"""<VOTABLE><RESOURCE><TABLE>
					<FIELD name="x" datatype="double"/><DATA><TABLEDATA>
					<TR><TD>13</TD></TR><TR><TD>break here</TD></TR>
					</TABLEDATA></DATA></TABLE></RESOURCE></VOTABLE>"""),
			}, [
				"While ingesting upload bar: Invalid literal for double (field x):"
				" 'break here'</INFO>"])

	def testMissingInlineParameter(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,param:HoNk",
				"LANG": "ADQL",
				"QUERY": 'SELECT top 1 * FROM tap_upload.bar',
				"MoNk": ("upl", open(self.aVOTable, "rb")),
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">'
					"Field UPLOAD: No inline upload 'HoNk' found"])

	def testWithUploadOldGeom(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "t1,param:HoNk",
				"LANG": "ADQL",
				"FORMAT": "votable/td",
				"QUERY": "SELECT ssa_location,"
				  " POINT('ICRS', COORD1(ssa_location), COORD2(ssa_location)) AS np"
				  " FROM test.adql join tap_upload.t1"
					" ON 1=contains(ssa_location, circle('', alpha, delta, 0.002))",
				"HoNk": trialhelpers.FakeFile(
					"upl", utils.getDirtyBytes(votWithOldGeom))
			}, [
				"<TD>Position UNKNOWNFrame 2.0009",
				"14.00100",
				"<TD>2.00", " 14.00"])

	def testWithUploadGeom(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "t1,param:HoNk",
				"LANG": "ADQL",
				"FORMAT": "votable/td",
				"QUERY": 'SELECT * FROM test.adql join tap_upload.t1'
					" on 1=contains(ssa_location, circle('', alpha, delta, 0.002))",
				"HoNk": trialhelpers.FakeFile("upl",
					utils.getDirtyBytes(votWithDALIGeom)),
			}, ["<TD>2.00", " 14.00"])

	def testArraySerialisation(self):
		return self.assertPOSTHasStrings(f"{_B}sync", {
			"REQUEST": "doQuery",
			"LANG": "ADQL",
			"QUERY": 'SELECT gavo_histogram(alpha, 0, 360, 4) as h FROM test.adql'}, [
			'AAAABgAAAAAAAAACAAAAAAAAAAAAAAABAAAAAA==',
			'arraysize="*" datatype="int" name="h"'])


class SimpleAsyncTest(TAPRenderTest):
	"""tests for some non-ADQL async queries.
	"""
	def testAsyncDirRedirects(self):
		def assertLocation(result):
			self.assertTrue(result[1].getLocationValue().endswith(f"{_B}async"))
			return result

		return self.assertStatus(f"{_B}async/", 302
			).addCallback(assertLocation)

	def testVersionRejected(self):
		return self.assertPOSTHasStrings(f"{_B}async", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"VERSION": "0.1"},
			['<INFO name="QUERY_STATUS" value="ERROR">', 'Version mismatch; this'])

	def testJobList(self):
		return self.assertGETHasStrings(f"{_B}async", {}, [
			'<uws:jobs ',
			' xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0" ',
			' version="1.1" ',
			])

	def testNonExistingPhase(self):
		return self.assertGETHasStrings(f"{_B}async/23/phase", {},
			['<VOTABLE ', 'ERROR">UWS job \'23\' could not'])

	def testLifecycle(self):
		"""tests job creation, redirection, phase, and deletion.
		"""
		# This one is too huge and much too slow for a unit test.  Still
		# I want at least one integration-type test in here since the
		# big test probably won't be run at every commit.
		def assertDeleted(result, jobId):
			self.assertEqual(result[1].code, 303)
			next = result[1].getLocationValue()
			self.assertTrue(next.endswith(f"{_B}async"),
				"Deletion redirect doesn't point to job list but to %s"%next)
			return self.assertGETLacksStrings(next, {}, ['jobref id="%s"'%jobId]
			).addCallback(lambda res: reactor.disconnectAll())

		def delete(ignored, jobId):
			return trialhelpers.runQuery(self.renderer, "DELETE",
				f"{_B}async/"+jobId, {}
			).addCallback(assertDeleted, jobId)

		def checkResult(ignored, jobId):
			return self.assertGETHasStrings(
				f"{_B}async/%s/results/result"%jobId, {}, [
				'name="sql_query"',
				'QAAAAA==']
			).addCallback(delete, jobId)

		def waitForCompletion(result, jobId):
			if b"EXECUTING" in result[0]:
				# wait for completion once more
				return self.assertGETHasStrings(f"{_B}async/%s"%jobId, {"WAIT": "10"},
					["COMPLETED"]
				).addCallback(checkResult, jobId)
			else:
				self.assertTrue(b"COMPLETED" in result[0],
					"'%s' doesn't have COMPLETED in it")
				return checkResult(result, jobId)

		def assertStarted(lastRes, jobId):
			# lastRes must be a redirect to the job info page
			req = lastRes[1]
			self.assertEqual(req.code, 303)
			self.assertEqual(req.getLocationValue(),
				 "http://localhost:8080/__system__/tap/run/async/"+jobId)
			return self.assertGETHasStrings(f"{_B}async/%s"%jobId, {"WAIT": "10"},
				[""]).addCallback(waitForCompletion, jobId)

		def promote(ignored, jobId):
			return trialhelpers.runQuery(self.renderer, "POST",
				f"{_B}async/%s/phase"%jobId, {"PHASE": "RUN"}
			).addCallback(assertStarted, jobId)

		def checkPlan(ignored, jobId):
			return self.assertGETHasStrings(f"{_B}async/%s/plan"%jobId, {},
				["Seq Scan on adql", "alpha < '3'::double"]
			).addCallback(promote, jobId)

		def checkQuote(ingored, jobId):
			return self.assertGETHasStrings(f"{_B}async/%s/quote"%jobId, {},
				['-']
				).addCallback(checkPlan, jobId)

		def checkPhase(ignored, jobId):
			return self.assertGETHasStrings(f"{_B}async/%s/phase"%jobId, {},
				['PENDING']
				).addCallback(checkQuote, jobId)

		def checkJobInfo(jobId):
			return self.assertGETHasStrings(f"{_B}async/%s"%jobId, {}, [
				'<uws:job ',
				'<uws:ownerId xsi:nil="true"',
				'<uws:startTime xsi:nil="true"',
				'<uws:parameter id="maxrec"']
				).addCallback(checkPhase, jobId)

		def checkPosted(result):
			# jobId is in location of result[1]
			request = result[1]
			self.assertEqual(request.code, 303)
			next = request.getLocationValue()
			self.assertFalse(f"{_B}async" not in next)
			jobId = next.split("/")[-1]
			return checkJobInfo(jobId)

		def fail(failure):
			raise failure.value

		return trialhelpers.runQuery(self.renderer, "POST", f"{_B}async", {
			"LANG": "ADQL",
			"maxrec": "33",
			"QUERY": "SELECT alpha FROM test.adql WHERE alpha<3"}
		).addCallback(checkPosted
		).addErrback(fail)

	def testBadConstructionArgument(self):
		def checkPosted(result):
			request = result[1]
			self.assertEqual(request.code, 400)
			with base.getTableConn() as conn:
				nJobsPost = list(
					conn.query("SELECT count(*) from tap_schema.tapjobs"))[0][0]
			self.assertEqual(nJobsPost, nJobsPre,
				"Abortive job creation created a job?")

		with base.getTableConn() as conn:
			nJobsPre = list(
				conn.query("SELECT count(*) from tap_schema.tapjobs"))[0][0]

		return self.assertPOSTHasStrings(f"{_B}async", {
			"LANG": "ADQL",
			"MAXREC": "kaputt",
			"QUERY": "SELECT ra FROM test.adql WHERE ra<3"},
			["<VOTABLE", 'value="ERROR"', "'kaputt' is not a valid literal"]
		).addCallback(checkPosted)

	def testJoblistLAST(self):
		def assertMaxOneResult(result):
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall(b'<uws.jobref ', result[0]))
			self.assertTrue(nJobs<2, "LAST=1 returns %s jobrefs"%nJobs)
			self.assertEqual(result[1].code, 200)

		return trialhelpers.runQuery(self.renderer, "GET", f"{_B}async", {
			"LAST": "1"}
			).addCallback(assertMaxOneResult)

	def testJoblistAFTER(self):
		def assertNoResult(result):
			self.assertEqual(result[1].code, 200)
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall(b'<uws.jobref ', result[0]))
			self.assertEqual(nJobs, 0)

		return trialhelpers.runQuery(self.renderer, "GET", f"{_B}async", {
			"AFTER": (datetime.datetime.utcnow()
				+datetime.timedelta(seconds=1)).isoformat()}
			).addCallback(assertNoResult)

	def testJoblistPHASE(self):
		def assertMaxOneResult(result):
			self.assertEqual(result[1].code, 200)
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall(b'<uws.jobref ', result[0]))
			self.assertTrue(nJobs==0, "PHASE=ABORTED returns a jobref?")

		return trialhelpers.runQuery(self.renderer, "GET", f"{_B}async", {
			"PHASE": 'ABORTED'}
			).addCallback(assertMaxOneResult)

	def testDestroyRunning(self):
		def assertDeleted(result, jobId, pid, wd):

			self.assertRaises(uws.JobNotFound,
				tap.WORKER_SYSTEM.getJob,
				(jobId,))
		
			retries = 0
			while True:
				try:
					self.assertEqual(os.waitpid(pid, os.WNOHANG), (0,0))
				except AssertionError:
					retries += 1
					if retries>5:
						raise
					time.sleep(0.05)
				except ChildProcessError:
					# I don't think these should come with WNOHANG, but... well
					# there they are, and since we assert the child isn't there,
					# I'm happy with them, too.
					break
				else:
					break
			return result

		def destroy(result, jobId):
			job = tap.WORKER_SYSTEM.getJob(jobId)
			self.assertEqual(job.phase, "EXECUTING")
			wd = job.getWD()
			pid = job.pid

			return trialhelpers.runQuery(self.renderer, "DELETE",
				f"/tap/async/{jobId}", {},
			).addCallback(assertDeleted, jobId, pid, wd)

		def waitUntilRuns(result, jobId):
			return self.assertGETHasStrings(f"tap/async/{jobId}", {"WAIT": "10"},
				['"query">JUST HANG']).addCallback(destroy, jobId)

		def startJob(result):
			jobURL = result[1].getLocationValue()
			self.assertTrue(re.search("tap/run/async", jobURL))
			jobId = jobURL.split("/")[-1]
			return self.assertPOSTHasStrings(f"/tap/async/{jobId}/phase",
				{"PHASE": "RUN"},
				["Unittest Suite -- See Other"]
				).addCallback(waitUntilRuns, jobId)

		return self.assertPOSTHasStrings("tap/async", {
			"LANG": "ADQL",
			"QUERY": "JUST HANG around"},
			["Unittest Suite -- See Other"]
		).addCallback(startJob)


class PersistentUploadTest(TAPRenderTest):
	with open(
			base.getConfig("inputsDir") / "data" / "vizier_votable.vot",
			"rb") as f:
		someVOT = io.BytesIO(f.read())

	def assert404Status(self, res):
		self.assertEqual(res[1].code, 404)
		return res

	def testNoTable(self):
		return self.assertGETHasStrings("tap/user_tables", {}, [
			'name="QUERY_STATUS"',
			"No persistent tables listing for anonymous users.<",])
	
	def testTablesListing(self):
		return self.assertGETHasStrings("tap/user_tables", {}, [
			"<vtm:tableset",
			'xmlns:vtm="http://www.ivoa.net/xml/VOSITables/v1.0"',
			"<name>tap_user</name>",
			"A schema containing users' uploads"], rm=trialhelpers.addCreds)

	def testBadTableName(self):
		return self.assertGETHasStrings("tap/user_tables/666", {}, [
			'name="QUERY_STATUS"',
			'>You must give a valid upload table name here<']
			).addCallback(self.assert404Status)

	def testNoPosting(self):
		return self.assertPOSTHasStrings("tap/user_tables/test_upload", {}, [
			'name="QUERY_STATUS"',
			"This resource cannot respond to the HTTP 'POST' method</INFO>"])

	def testNoMediaType(self):
		def makePUT(request):
			request.setPayload(self.someVOT, "application/octet-stream")
			request.method = "PUT"

		return self.assertPOSTHasStrings(
			f"{_B}user_tables/test_upload", {}, [
				"Do not know how to ingest application/octet-stream tables<",
				"if in doubt, use application/x-votable+xml"
			], rm=makePUT)

	def testSuccessfulUpload(self):

		def makePUT(request):
			request.setPayload(self.someVOT, base.votableType)
			request.method = "PUT"

		def makeDELETE(request):
			request.method = "DELETE"

		def assertCleanedUp(result):
			self.assertEqual(result[0], b'Dropped user table test_upload\n')
			self.assertEqual(result[1].code, 200)

			with base.getTableConn() as conn:
				self.assertEqual(list(conn.query(
					"select user_name, table_name"
					" from tap_user.tables where user_name='anonymous' and"
					" table_name='test_upload'")), [])
				self.assertEqual(base.UnmanagedQuerier(conn).getTableType(
					"tap_user._anonymous_test_upload"), None)
			return result

		def tryDELETE(result):
			return self.assertGETHasStrings(
				f"{_B}user_tables/test_upload", {}, [
				"Dropped user table test_upload"], rm=makeDELETE
			).addCallback(assertCleanedUp)

		def tryGET(result):
			self.assertEqual(result[1].code, 200)
			with base.getTableConn() as conn:
				self.assertEqual(
					list(conn.query("select color from tap_user._anonymous_test_upload"
						" where tyc2='1265-01176-1'")),
					[(0.25,)])
			return self.assertGETHasStrings(
				f"{_B}user_tables/test_upload", {}, [
				"<name>tap_user.test_upload</name>",
				"<column><name>tyc2</name>"]
				).addCallback(tryDELETE)

		return self.assertPOSTHasStrings(
			f"{_B}user_tables/test_upload", {}, [
				"Query this table as tap_user.test_upload",
			], rm=makePUT
			).addCallback(tryGET)
			
	def testCreateAs(self):
		def makeDELETE(request):
			request.method = "DELETE"

		def cleanup(res):
			return self.assertGETHasStrings(f"{_B}user_tables/fromquery", {}, [
				"Dropped user table fromquery"], rm=makeDELETE)

		def tryQuery(res):
			return self.assertGETHasStrings(
				f"{_B}sync", {"LANG": "ADQL",
					"QUERY": "select * from tap_user.fromquery"}, [
					'<STREAM encoding="base64">QAAAAA==</STREAM>'])

		return self.assertGETHasStrings(
			f"{_B}sync", {"LANG": "ADQL",
				"QUERY": "CREATE TABLE tap_user.fromquery AS"
					" SELECT alpha FROM test.adql WHERE alpha<3"}, [
				'<DESCRIPTION>A sample RA</DESCRIPTION>', # metadata present
				'<STREAM encoding="base64"></STREAM>',  # empty VOTable
				]).addCallback(tryQuery
				).addBoth(cleanup)
	
	def testNoReservedWords(self):
		def makePUT(request):
			request.setPayload(self.someVOT, base.votableType)
			request.method = "PUT"

		return self.assertPOSTHasStrings(
			f"{_B}user_tables/table", {}, [
				'value="ERROR"',
				"'table' cannot be used as an upload table name (which must be"
				" regular ADQL identifiers, in particular not ADQL reserved words)."
			], rm=makePUT)

			

atexit.register(trialhelpers.provideRDData("test", "ADQLTest"))
