"""
Tests requiring having data/cores#import_concat available.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import atexit

from gavo.helpers import trialhelpers

from gavo import base
from gavo import formal
from gavo import votable
from gavo.helpers import testtricks

base.DEBUG = True


class SCSTest(trialhelpers.ArchiveTest, testtricks.XSDTestMixin):
	def testCasting(self):
		return self.assertGETHasStrings("/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["1.5"]},
			['P+HiwiZkMh', 'datatype="char"', "<COOSYS", 'epoch="J1985.7"'])

	def testCapability(self):
		return self.assertGETHasStrings("/data/cores/scs/capabilities", {}, [
			'standardID="ivo://ivoa.net/std/VOSI#capabilities"',
			'standardID="ivo://ivoa.net/std/ConeSearch"',
			'xsi:type="vs:ParamHTTP',
			'<name>SR</name>',
			'<sr>1</sr>'])

	def testSCSWeb(self):
		return self.assertGETHasStrings("/data/cores/scs/form", {
			"hscs_pos": "1.2,2", "hscs_sr": "90",
			"__nevow_form__": "genForm"}, ["Dist", "1808.96"])

	def testSCSWebMAXREC(self):
		def checkDestnameChanged(res):
			self.assertEqual(
				res[1].responseHeaders.getRawHeaders("content-disposition"),
				['attachment; filename=truncated_votable.xml'])
			return res

		return self.assertGETHasStrings("/data/cores/scs/form", {
			"hscs_pos": "1.2,2", "hscs_sr": "10800", "MAXREC": "1",
			"_FORMAT": "VOTable", "_TDENC": "on",
			"__nevow_form__": "genForm"}, ["<TR><TD>0.5024899",
				'The query limit was reached. Increase it to']
			).addCallback(checkDestnameChanged)

	def testMAXREC(self):

		def assertMaxrecHonored(res):
			self.assertEqual(res[0].count(b"<TR>"), 1)

		return self.assertGETHasStrings("/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["180"], "RESPONSEFORMAT": "votabletd",
				"Maxrec": "1"},
			['name="warning"', 'query limit was reached']
			).addCallback(assertMaxrecHonored)

	def testSCSDefaultSort(self):
		def assertSorted(res):
			self.assertEqual([r[1] for r in next(votable.parseBytes(res[0]))],
				['0', '1'])

		return trialhelpers.runQuery(self.renderer, "GET",
			"/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["180"],
				"REsPONSEFORMAT": "votabletd"}
			).addCallback(assertSorted)

	def testSCSDirectionOverride(self):
		def assertSorted(res):
			self.assertEqual([r[1] for r in next(votable.parseBytes(res[0]))],
				['1', '0'])

		return trialhelpers.runQuery(self.renderer, "GET",
			"/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["180"],
				"RESPONSEFORMAT": "votabletd", "_DBOPTIONS_DIR": "DESC"}
			).addCallback(assertSorted)

	def testProtocolDeclared(self):
		return self.assertGETHasStrings("/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["1"]},
			['<INFO name="standardID" value="ivo://ivoa.net/std/ConeSearch"',
				'Written by DaCHS ', ' SCSRenderer'])

	def testValidResponse(self):
		return self.assertGETIsValid("/data/cores/scs/scs.xml",
			{"RA": ["0"], "DEC": ["0"], "SR": ["180"]})

	def testVanityReplacementWorks(self):
		return self.assertGETHasStrings("/fromvanitynames",
			{"RA": ["1"], "DEC": ["2"], "SR": ["1"]},
			['<INFO name="standardID" value="ivo://ivoa.net/std/ConeSearch"',
				'Written by DaCHS ', ' SCSRenderer'])

	def testTimeoutResponse(self):

		def assertValid(res):
			self.assertValidates(res[0])

		return self.assertGETHasStrings("/data/cores/scs/scs.xml",
			{"RA": ["1"], "DEC": ["2"], "SR": ["1"], "_TIMEOUT": ["-1"]},
			['<INFO ID="Error"']
			).addCallback(assertValid)


class FormTest(trialhelpers.ArchiveTest):
	def testSimple(self):
		return self.assertGETHasStrings("/data/test/basicprod/form", {}, [
				'<a href="/static/help_vizier.shtml#floats">[?num. expr.]</a>',
				"<h1>Somebody else's problem</h1>",
			])
	
	def testInputSelection(self):
		return self.assertGETHasStrings("/data/cores/cstest/form", {}, [
				'Coordinates (as h m s, d m s or decimal degrees), or SIMBAD',
				'Search radius in arcminutes',
				'A sample magnitude'
			])
	
	def testMultigroupMessages(self):
		return self.assertGETHasStrings("/data/cores/impgrouptest/form", {
				"rV": "invalid",
				"mag": "bogus",
				formal.FORMS_KEY: "genForm",
			}, [
				# assert multiple form items in one line:
				'class="description">Ah, you know.',
				'class="inmulti"',
				'<div class="message">Not a valid number; Not a valid number</div>',
				'value="invalid"'])
	
	def testCSSProperties(self):
		return self.assertGETHasStrings("/data/cores/cstest/form", {}, [
			'class="field string numericexpressionfield rvkey"',
			'.rvkey { background:red; }'])
	
	def testInputKeyDefault(self):
		return self.assertGETHasStrings("/data/cores/grouptest/form", {}, [
				'value="-4.0"'
			])

	def testInputKeyFilled(self):
		return self.assertGETHasStrings("/data/cores/grouptest/form",
			{"rV": "9.25"}, [
				'value="9.25"'
			])

	def testSCSPositionComplains(self):
		return self.assertGETHasStrings("/data/cores/cstest/form", {
				"hscs_pos": "23,24", "__nevow_form__": "genForm", "VERB": "3"},
				["Field hscs_sr: If you query for a position, you must give"
					" a search radius"])

	def testJSONOnForm(self):
		return self.assertGETHasStrings("/data/cores/scs/form", {
				"hscs_sr": "2000", "hscs_pos": "2,14", "rV": "", "__nevow_form__":
				"genForm", "VERB": "3", "_FORMAT": "JSON"},
				['"queryStatus": "OK"', '"dbtype": "real"'])

	def testServiceKeyRendered(self):
		self.assertGETHasStrings("/data/cores/uploadtest/form", {},
			['<div class="description">A service key</div>'])


class APIRenderTest(trialhelpers.ArchiveTest):
	def testResponseKey(self):
		return self.assertGETHasStrings("/data/cores/scs/api",
			{"id": ["-1000 0"], "RESPONSEFORMAT": ["tsv"]},
			["0\t1.25\t2.5\n"],
			customTest=lambda tx: self.assertFalse(b"1\t23.0" in tx))

	def testKeysInsensitive(self):
		return self.assertGETHasStrings("/data/cores/scs/api",
			{"ID": ["-1000 0"], "ResponseformAT": ["tsv"]},
			["0\t1.25\t2.5\n"],
			customTest=lambda tx: self.assertFalse(b"1\t23.0" in tx))

	def testResponseMIME(self):
		return self.assertGETHasStrings("/data/cores/scs/api",
			{"id": ["-1000 0"], "RESPONSEFORMAT":
				["application/x-votable+xml;serialization=BINARY2"]},
			["<BINARY2>", "gH/4AAAAAAAAAAAAAD+g"],
			customTest=lambda tx: self.assertFalse(b"1\t23.0" in tx))
	
	def testMetadataResponse(self):
		return self.assertGETHasStrings("/data/cores/scs/api",
			{"MAXREC": ["0"]}, [
				'<RESOURCE type="meta">',
				'<OPTION name="HTML" value="text/html"/>',
				'name="INPUT:maxrec"',
				'"/></TABLE>', # this would break if there were non-empty DATA
				'value="http://localhost:8080/data/cores/scs/api?">Test Fixtures'])

	def testUploadMetadata(self):
		return self.assertGETHasStrings("/data/cores/uploadtest/api",
			{"MAXREC": ["0"]}, [
				'name="INPUT:UPLOAD"',
				'ucd="par.upload"',
				"of the form 'notarbitrary,URL'",
				"An example upload containing nothing in particular"])
	
	def testVOSIUploadMetadata(self):
		return self.assertGETHasStrings("/data/cores/uploadtest/capabilities",
			{},
			["<name>UPLOAD</name>", "notarbitrary,URL"])

	def testUploadInlineBadPar(self):
		return self.assertGETHasStrings("/data/cores/uploadtest/api",
			{"UPLOAD": ["notarbitrary,param:notexisting"]}, [
				'value="ERROR"',
				'Field UPLOAD: param:notexisting references a'
				' non-existing file upload.'])

	def testUploadInlineGoodPar(self):
		return self.assertGETHasStrings("/data/cores/uploadtest/api", {
			"UPLOAD": ["notarbitrary,param:hello"],
			"RESPONSEFORMAT": "votabletd",
			"hello": trialhelpers.FakeFile(
				"knorr", b"abc, die Katze lief im Schnee.\n")
			}, [
				'<TR><TD>notarbitrary</TD><TD>abc, die Katze lief im'
				' Schnee.\n</TD></TR>'])

	def testUploadURL(self):
		with trialhelpers.DataServer("def, ich bin der Chef.\n") as baseURL:
			return self.assertPOSTHasStrings("/data/cores/uploadtest/api", {
				"UPLOAD": ["notarbitrary,%s"%baseURL],
				"RESPONSEFORMAT": "votabletd",
				"hello": trialhelpers.FakeFile(
					"test.txt", b"abc, die Katze lief im Schnee.\n")
				}, [
					'<TR><TD>notarbitrary</TD><TD>def, ich bin der Chef.\n</TD></TR>'])
	
	def testServiceParametersValidated(self):
		return self.assertGETHasStrings("/data/cores/scs/api",
			{"RESPONSEFORMAT": "fantastic junk"},
				["Field responseformat: 'fantastic junk' is not a valid value for"
					" responseformat</INFO>"])


atexit.register(trialhelpers.provideRDData("cores", "import_conecat"))
