"""
Tests for the datalink subsystem and (potentially) special data tricks.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import atexit
import glob
import os
import time

from twisted.internet import reactor
from twisted.python import threadable
threadable.init()

from gavo.helpers import trialhelpers

from gavo import api


class SyncTest(trialhelpers.ArchiveTest):
	def testInfoWorks(self):
		return self.assertGETHasStrings("/data/cores/dl/info", {}, [
			'<h2 id="svci-inputs" class="section">Input Fields</h2>',
			"<td>The publisher DID of the dataset of interest</td>"])

	def testErrorDocumentMetaGeneral(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta",
			{"iD": "broken"},
			["<TD>Fault: name 'ddt' is not defined</TD>"])

	def testErrorDocumentMetaNotFound(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta",
			{"ID": "ivo://not.here"},
			["<TD>NotFoundFault: The authority in the dataset identifier 'ivo://not.here' could not be located in the authorities managed here</TD>"])

	def testErrorDocumentAccess(self):
		return self.assertGETHasStrings("/data/cores/dl/dlget",
			{"id": "broken"},
			["name 'ddt' is not defined"])

	def testErrorStatus(self):
		return self.assertStatus("/data/cores/dl/dlget", 422)

	def testWithoutId(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta", {}, [
			"<TABLEDATA></TABLEDATA>",
			"<FIELD",
			'name="service_def"'])

	def testMetadata(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta",
			{"ID": "ivo://x-testing/~?data/excube.fits"}, [
				'latitude coordinate</DESCRIPTION><VALUES><MIN value="30.98318158',
					'</MIN><MAX value="30.98484850',
				'xtype="interval"',
				'name="QUERY_STATUS"',
				'value="OK"'])

	def testRespformat1(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta", {
				"ID": "ivo://x-testing/~?data/excube.fits",
				"RESPONSEFORMAT": "votable",
			},
			['<DESCRIPTION>The latitude coordinate</DESCRIPTION>'
				'<VALUES><MIN value="30.983181587',])

	def testRespformat2(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta", {
				"ID": "ivo://x-testing/~?data/excube.fits",
				"RESPONSEFORMAT": "application/x-votable+xml",
			},
			['<DESCRIPTION>The latitude coordinate</DESCRIPTION>'
				'<VALUES><MIN value="30.983181587',])

	def testServerSideRendering(self):
		def makeMozillaRequest(request):
			request.requestHeaders.setRawHeaders("user-agent", ["Mozilla"])

		return self.assertGETHasStrings("/data/cores/dl/dlmeta", {
				"ID": "ivo://x-testing/~?data/excube.fits",
			}, [
			'xmlns="http://www.w3.org/1999/xhtml"',
			'<span class="low-limit">3.749e-07</span>',
			'Broadcast dataset via SAMP</button>'],
			rm=makeMozillaRequest)

	def testInvalidRespformat(self):
		def assertStatus422(res):
			self.assertEqual(res[1].code, 422)

		return self.assertGETHasStrings("/data/cores/dl/dlmeta", {
				"ID": "ivo://x-testing/~?data/excube.fits",
				"RESPONSEFORMAT": "vnd-microsoft/xls"
			},
			["Field responseformat: 'vnd-microsoft/xls'"
				" is not a valid value for responseformat"]).addCallback(
				assertStatus422)

	def testRedirection(self):
		def assertStatus301(res):
			self.assertEqual(res[1].code, 301)

		return self.assertGETHasStrings("/data/cores/dl/dlget", {
				"ID": "somewhereelse",
			},
			['<a href="http://some.whereel.se/there">different URL']).addCallback(
				assertStatus301)

	def testMetadataError(self):
		return self.assertGETHasStrings("/data/cores/dl/dlmeta",
			{"ID": "ivo://x-testing/~?data/excube.fit"},
			["TR><TD>ivo://x-testing/~?data/excube.fit</TD><TD></TD><TD></TD><TD>"
				"NotFoundFault: accref 'data/excube.fit' could not be located"
				" in product table</TD>"])
	
	def testCubeCutout(self):
		def assertNameGiven(res):
			self.assertEqual(res[1].responseHeaders.getRawHeaders(
					"content-disposition")[0],
				"attachment; filename=data_excube_proc.fits")

		return self.assertGETHasStrings("/data/cores/dl/dlget", {
			"ID": "ivo://x-testing/~?data/excube.fits",
			"BAND": "375.4e-9 +Inf"}, [
			"NAXIS3  =                    2",
			"CRPIX3  =                 -1.0"]
			).addCallback(assertNameGiven)

	def testVODMLSpectrum(self):
		def assertMediatype(res):
			self.assertEqual(
				res[1].responseHeaders.getRawHeaders("content-type")[0],
				"application/x-votable+xml;serialization=TABLEDATA;version=1.6")
			self.assertEqual(
				res[1].responseHeaders.getRawHeaders("content-disposition")[0],
				"attachment; filename=test2_proc.vot6")

		return self.assertGETHasStrings("/data/ssatest/dl/dlget", {
				"FORMAT": "application/x-votable+xml;serialization=TABLEDATA;version=1.6",
				"ID": "ivo://test.inv/test2"}, [
			'dmtype="dachstoy:Location"',
			'<mivot:ATTRIBUTE dmrole="y" ref="flux"/>',
			"1755.0"]
			).addCallback(assertMediatype)

	def testNoMultiArguments(self):
		def assertErrorResponse(res):
			self.assertEqual(res[1].code, 422)

		return self.assertGETHasStrings("/data/cores/dl/dlget", {
				"CIRCLE": ["10 10 5", "14 13 2"],
				"ID": "ivo://x-testing/~?data/excube.fits"},
			["MultiValuedParamNotSupported: Field CIRCLE"]
			).addCallback(assertErrorResponse)

	def testAvailability(self):
		return self.assertGETHasStrings("/data/cores/dl/availability", {},
			["<avl:available>true</avl:available>"])

	def testCapabilities(self):
		return self.assertGETHasStrings("/data/cores/dl/capabilities", {}, [
			'standardID="ivo://ivoa.net/std/DataLink#links-1.1"',
			'/data/cores/dl/dlmeta</accessURL>',
			'<ucd>meta.id;meta.main</ucd>'])

	def testNoExtraSegments(self):
		return self.assertGETHasStrings("/data/ssatest/dl/dlget/inv.test2", {
				"ID": "ivo://test.inv/test2"},
			["Not Found (404)",
			"dlget has no child resources</"])

	def testCleanedup(self):
		# this doesn't do any queries, it just makes sure that
		# the datalink services above cleaned up after themselves
		# (of course, we might see crap from the last run rather than
		# from this, but statistically it should catch trouble.
		pooLeft = glob.glob(
			os.path.join(api.getConfig("tempDir"), "fitstable*"))
		self.assertFalse(pooLeft, "Something left fitstable temporaries"
			" in tempDir %s"%api.getConfig("tempDir"))

	def testDECandPOS(self):
		return self.assertGETHasStrings("/data/cores/dl/dlget", {
			"ID": "ivo://x-testing/~?data/excube.fits",
			"DEC": "30.9832 30.9834",
			"POS": "CIRCLE 359.36 30.985 0.0004"},[
			"UsageError: Field DEC: Attempt to cut out along axis 2"
			" that has been modified before."])

	def testPOSandPIXEL(self):
		return self.assertGETHasStrings("/data/cores/dl/dlget", {
			"ID": "ivo://x-testing/~?data/excube.fits",
			"PIXEL_1": "1 3",
			"POS": "CIRCLE 359.36 30.985 0.0004"},[
			"UsageError: Field PIXEL_1: Attempt to cut out along axis 1"
			" that has been modified before."])

	def testEmptyResponse(self):
		def assertResponseCode(res):
			self.assertEqual(res[0], b"")
			self.assertEqual(res[1].code, 204)

		return self.assertGETLacksStrings("/data/cores/dl/dlget", {
			"ID": "ivo://x-testing/~?data/excube.fits",
			"POS": "CIRCLE 10 10 0.0001"},
			[" "]).addCallback(
				assertResponseCode)

	def testMaxrecHonored(self):
		def assertFullRows(res):
			data, metadata = api.votable.loads(res[0])
			rows = list(metadata.iterDicts(data))
			# 10+10 rows from our meta maker, plus the #this links for them
			self.assertEqual(set(r["ID"] for r in rows), {'p0', 'p1'})
			self.assertEqual(len(rows), 22)
			return res

		return self.assertGETHasStrings("/data/cores/maxrecdl/dlmeta", {
			"ID": ["p"+str(i) for i in range(10)],
			"MAXREC": "15"}, [
			'name="QUERY_STATUS"',
			'value="OVERFLOW"']).addCallback(
				assertFullRows)


def killLocalhost(url):
	"""should delete the host part from url.

	Well, this only works for a very special case and is easy to improve  :-)
	"""
	return url[21:]


# workaround for 2019 trial failure to have the reactor running in
# time.
from gavo.protocols.uws import ProcessBasedUWSTransitions
ProcessBasedUWSTransitions.trial_forceTwisted = True

class AsyncTest(trialhelpers.ArchiveTest):
	def testNonExistingJobMessage(self):
		return self.assertGETHasStrings("/data/cores/dl/dlasync/thisjobidisbad",
			{}, [
			'name="QUERY_STATUS"',
			'value="ERROR"',
			"UWS job 'thisjobidisbad' could not be located in jobs table"])

	def testNonExistingJobStatus(self):
		return self.assertStatus("/data/cores/dl/dlasync/thisjobidisbad",
			404)

	def testJoblist(self):
		return self.assertGETHasStrings("/data/cores/dl/dlasync",
			{}, [
			"/static/xsl/dlasync-joblist-to-html.xsl",
			"<uws:jobs"])

	def testBasicCutout(self):
		# this is a pretty close clone of testLifeCycle in test_tap, and
		# whatever's said there applies here, too.
		def assertDeleted(result, jobURL):
			self.assertEqual(result[1].code, 303)
			next = killLocalhost(result[1].getLocationValue())
			jobId = jobURL.split("/")[-1]
			return self.assertGETLacksStrings(next, {}, ['jobref id="%s"'%jobId]
			).addCallback(lambda res: reactor.disconnectAll())

		def deleteJob(jobURL):
			return trialhelpers.runQuery(self.renderer, "DELETE",
				jobURL, {}
			).addCallback(assertDeleted, jobURL)

		def checkResult(result, jobURL):
			self.assertEqual(
				result[1].responseHeaders.getRawHeaders("content-type")[0],
				"application/fits")
			self.assertTrue(b"NAXIS1  =                   11" in result[0])
			return deleteJob(jobURL)
		
		def waitForResult(result, jobURL, retry):
			if retry>300:
				raise AssertionError("Datalink job at jobURL %s didn't finish."
					"  Leaving it for inspection."%jobURL)
			if result[0].startswith(b"COMPLETED"):
				return trialhelpers.runQuery(self.renderer, "GET",
					jobURL+"/results/result", {}
				).addCallback(checkResult, jobURL)
			if result[0].startswith(b"ERROR"):
				raise AssertionError(f"Datalink job at {jobURL} errored out.")

			time.sleep(0.1)
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/phase", {}
			).addCallback(waitForResult, jobURL, retry+1)

		def assertStarted(result, jobURL):
			req = result[1]
			self.assertEqual(req.code, 303)
			self.assertEqual(killLocalhost(req.getLocationValue()), jobURL)
			return waitForResult((b"", None), jobURL, 0)

		def startJob(jobURL):
			return trialhelpers.runQuery(self.renderer, "POST",
				jobURL+"/phase", {"PHASE": "RUN"}
			).addCallback(assertStarted, jobURL)

		def checkPosted(result):
			request = result[1]
			self.assertEqual(request.code, 303)
			jobURL = request.getLocationValue()
			self.assertTrue(jobURL.startswith(
				"http://localhost:8080/data/cores/dl/dlasync/"),
				"Bad service URL on redirect")
			return startJob(killLocalhost(jobURL))

		return trialhelpers.runQuery(self.renderer,  "POST",
			"/data/cores/dl/dlasync", {
				"ID": "ivo://x-testing/~?data/excube.fits",
				"BAND": "375.4e-9 +Inf"}
			).addCallback(checkPosted)


class FromtableTest(trialhelpers.ArchiveTest):
	def testBasic(self):
		return self.assertGETHasStrings("/data/ssatest/fdbdl/dlmeta",
			{"ID": "data/spec1.ssatest"}, [
				"<TD>big fart nebula</TD>",
				"<TD>Preview image</TD>",
				"<TD>http://localhost:8080/data/spec1.ssatest</TD>",
				"<TD>#this</TD>"
			])

	def testNotFound(self):
		return self.assertGETHasStrings("/data/ssatest/fdbdl/dlmeta",
			{"ID": "duff_id"}, [
				"<TD>NotFoundFault: No dataset with id duff_id known here</TD>"
			])

	def testDLGet(self):
		def assertRedirects(res):
			self.assertEqual(res[1].code, 301)
			self.assertEqual(
				res[1].getLocationValue(),
				"http://localhost:8080/data/spec1.ssatest")
			return res

		return self.assertGETHasStrings("/data/ssatest/fdbdl/dlget",
			{"ID": "data/spec1.ssatest"}, [
			"title>Unittest Suite -- Moved Permanently</title>"]
			).addCallback(assertRedirects)


class GetTarTest(trialhelpers.ArchiveTest):
	# admittedly, this is not datalink, but while we have a non-trivial
	# product table, try this, too

	def assertMediaType(self, result):
		self.assertEqual(
			result[1].responseHeaders.getRawHeaders("content-type")[0],
			"application/x-tar")
		return result

	def testTarDelivered(self):
		return self.assertGETHasStrings("/__system__/products/getTar/get",
			{"accref": ["data/ex.fits", "data/spec1.ssatest"]}, [
				"\0\0\0\0\0\0\0", # tars have lots of 0-runs
				"dc_data/ex.fits", # tar header one
				"166.912408", # from ex.fits
				"dc_data/spec1.ssatest", # second tar header
				"alpha: 10.1", # content of spec1
				]).addCallback(self.assertMediaType)


atexit.register(trialhelpers.provideRDData("test", "import_fitsprod"))
atexit.register(trialhelpers.provideRDData("ssatest", "test_import"))
