"""
Some tests around the SSAP infrastructure.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import itertools
import re
import tempfile

from gavo.helpers import testhelpers

from gavo import api
from gavo import base
from gavo import rsc
from gavo import utils
from gavo.helpers import trialhelpers
from gavo.protocols import products
from gavo.protocols import sdm
from gavo.utils import DEG, pyfits
from gavo.web import vodal

import tresc

def _mkd(**kwargs):
	return kwargs


def getRD():
	return testhelpers.getTestRD("ssatest.rd")


class RDTest(testhelpers.VerboseTest):
# tests for some aspects of the ssap rd.
	def testUtypes(self):
		srcTable = getRD().getById("hcdtest")
		self.assertEqual("ssa:Access.Reference",
			srcTable.getColumnByName("accref").utype)

	def testDefaultedParam(self):
		self.assertEqual(
			getRD().getById("hcdtest").getParamByName("ssa_timeSI").value,
			None)
		self.assertEqual(
			getRD().getById("hcdtest").getParamByName("ssa_fluxucd").value,
			"phot.flux.density;em.wl")

	def testNullDefaultedParam(self):
		self.assertEqual(
			getRD().getById("hcdtest").getParamByName("ssa_creator").value,
			None)

	def testOverriddenParam(self):
		self.assertEqual(
			getRD().getById("hcdtest").getParamByName("ssa_instrument").value,
			"DaCHS test suite")

	def testNormalizedDescription(self):
		self.assertTrue("matches your query" in
			getRD().getById("foocore").outputTable.getColumnByName("ssa_score"
				).description)


class _SSATestRowmaker(testhelpers.TestResource):
	def make(self, ignored):
		rd = testhelpers.getTestRD("ssatest")
		td = rd.getById("hcdtest").change(onDisk=False)
		rowmaker = rd.getById("makeRow").compileForTableDef(td)
		vars = {"dstitle": "testing",
			"id": "f",
			"specstart": 10,
			"specend": 20,
			"specext": 10,
			"bandpass": "ultracool",
			"alpha": 1,
			"delta": -81.,
			"dateObs": 2455000.3,
			"targetName": "f star",
			"prodtblAccref": "test/junk",
			"prodtblOwner": None,
			"prodtblEmbargo": None,
			"prodtblPath": "/a/b/c",
			"prodtblTable": "test.none",
			"prodtblMime": "text/wirr",
			"prodtblFsize": -23,
			"redshift": None,
		}

		def wrappedRowmaker(updates):
			actualVars = vars.copy()
			actualVars.update(updates)
			return rowmaker(actualVars, td)

		return wrappedRowmaker


class ProcTest(testhelpers.VerboseTest):
	resources = [("rowmaker", _SSATestRowmaker())]

	def testObsDateMJD(self):
		self.assertEqual(self.rowmaker({"dateObs": 54200.25})["ssa_dateObs"],
			54200.25)

	def testObsDateJD(self):
		self.assertEqual(self.rowmaker({"dateObs": 2454200.25})["ssa_dateObs"],
			54199.75)

	def testObsDateISO(self):
		self.assertAlmostEqual(self.rowmaker({"dateObs": "1998-10-23T05:04:02"})[
			"ssa_dateObs"],
			51109.211134259123)

	def testObsDateTimestamp(self):
		self.assertAlmostEqual(self.rowmaker({"dateObs":
			datetime.datetime(1998, 10, 23, 0o5, 0o4, 0o2)})[
			"ssa_dateObs"],
			51109.211134259123)

	def testObsDateNULL(self):
		self.assertEqual(self.rowmaker({"dateObs": None})["ssa_dateObs"],
			None)


class _WithSSATableTest(testhelpers.VerboseTest):
	resources = [("ssaTable", tresc.ssaTestTable)]
	renderer = "ssap.xml"

	def runService(self, id, params, renderer=None):
		if renderer is None:
			renderer = self.renderer
		return trialhelpers.runSvcWith(getRD().getById(id), renderer, params)


class ImportTest(_WithSSATableTest):

	def testImported(self):
		row = self.ssaTable.getRow("data/spec1.ssatest")
		self.assertEqual(row["ssa_dstitle"], "test spectrum 1")
	
	def testLocation(self):
		row = self.ssaTable.getRow("data/spec1.ssatest")
		self.assertAlmostEqual(row["ssa_location"].x, 10.1*DEG)


class ImportProcTest(testhelpers.VerboseTest):
	def testStandardPubDID(self):
		table = rsc.makeData(getRD().getById("test_macros")).getPrimaryTable()
		self.assertTrue(table.rows[0]["pubDID"].startswith(
			"ivo://x-testing/~?data/spec"))


class CoreQueriesTest(_WithSSATableTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		inDict, ids = sample
		inDict["REQUEST"] = "queryData"
		res = self.runService("s", _mkd(**inDict))
		self.assertEqual(
			set([row["ssa_pubDID"].split("/")[-1]
				for row in res.getPrimaryTable()]),
			set(ids))

	samples = [
		({"POS": "10,+15", "SIZE": "0.5", "FORMAT": "votable"},
			["test1"]),
		({"POS": "10,+15", "SIZE": "2", "FORMAT": "votable"},
			["test1", "test2"]),
		({"BAND": "/4.5e-7,6.5e-7/", "FORMAT": "votable"},
			["test1", "test3"]),
		({"BAND": "4.5e-7/7.5e-7", "FORMAT": "votable"},
			["test1", "test2", "test3"]),
		({"BAND": "U", "FORMAT": "votable"},
			[]),
#5
		({"BAND": "V,R", "FORMAT": "votable"},
			["test2", "test3"]),
		({"TIME": "/2020-12-20T13:00:01", "FORMAT": "votable"},
			["test1"]),
		({"FORMAT": "votable"},
			["test1", "test2", "test3"]),
		({"FORMAT": "compliant"},
			["test1", "test2", "test3"]),
		({"FORMAT": "native"},
			["test3"]),
#10
		({"FORMAT": "image"},
			[]),
		({"FORMAT": "all"},
			["test1", "test2", "test3"]),
		({"FORMAT": "ALL"},
			["test1", "test2", "test3"]),
		({"TARGETNAME": "booger star,rat hole in the yard"},
			["test2", "test3"]),
		({"PUBDID": "ivo://test.inv/test2"},
			["test2"]),
#15
		({"excellence": "/100"},
			["test2", "test3"]),
		({"POS": "10,+15"}, # POS without SIZE is ignored
			["test1", "test2", "test3"]),
		({"SIZE": "30"}, # splat sends SIZE without POS; ignore it in this case.
			["test1", "test2", "test3"]),
		({"WILDTARGET": "BIG*"},
			["test1"]),
		({"WILDTARGETCASE": "BIG*"},
			[]),
#20
		({"WILDTARGET": "b??g*"},
			["test2"]),
		({"WILDTARGET": "\\*"},
			[]),
		({"WILDTARGET": "[br][oa]*"},
			["test2", "test3"]),
		({"FLUXCALIB": "CALIBRATED"},
			[]),
		({"FLUXCALIB": "unCALIBRATED"},
			['test1', 'test3', 'test2']),
#25
		({"SPECRP": "5500"},
			["test2", "test3"]),
		({"SPECRP": "3000"},
			['test1', 'test3', 'test2']),
		({"SPATRES": "/1e-13"},
			['test1', 'test3', 'test2']), # (no spatial resolution given in test set)
		({"SPATRES": "1e-13/"},
			['test1', 'test3', 'test2']),
		({"WAVECALIB": "calibrated"},
			['test1', 'test3', 'test2']),
#30
		({"WAVECALIB": "approximate"},
			[]),
		({"COLLECTION": "ssa test set"},
			['test1', 'test3', 'test2']),
		({"COLLECTION": "ssa Test set"},
			[]),
		({"FORMAT": "application/x-votable+xml,application/fits"},
			["test1", "test2", "test3"]),
		({"WAVECALIB": "any"},
			['test1', 'test3', 'test2']),
#35
		({"SPECCALIB": "ANY"},
			['test1', 'test3', 'test2']),
	]


class CoreMiscTest(_WithSSATableTest):
	def testRejectWithoutREQUEST(self):
		inDict= {"POS": "12,12",
			"SIZE": "1"}
		self.assertRaisesWithMsg(base.ValidationError,
			"Field REQUEST: Missing or invalid value for REQUEST.",
			self.runService,
			("s", _mkd(**inDict)))
	
	def testDefaultRequest(self):
		service = getRD().getById("s")
		service.setProperty("defaultRequest", "querydata")
		try:
			res = self.runService("s",
				_mkd(BAND="4.5e-7/7.5e-7", FORMAT="votable"))
			self.assertEqual(
				set([row["ssa_pubDID"].split("/")[-1]
					for row in res.getPrimaryTable()]),
				set(["test1", "test2", "test3"]))
		finally:
			service.clearProperty("defaultRequest")


class CoreNullTest(_WithSSATableTest):
# make sure empty parameters of various types are just ignored.
	totalRecs = 6

	def _getNumMatches(self, inDict):
		inDict["REQUEST"] = "queryData"
		return len(self.runService("s", _mkd(**inDict),
			).getPrimaryTable().rows)
	
	def testSomeNULLs(self):
		self.assertEqual(self._getNumMatches({"TIME": "", "POS": ""}),
			self.totalRecs)
	
	def testBANDNULL(self):
		self.assertEqual(self._getNumMatches({"BAND": ""}), self.totalRecs)

	def testFORMATNULL(self):
		self.assertEqual(self._getNumMatches({"FORMAT": ""}), self.totalRecs)

	def testFORMATALL(self):
		self.assertEqual(self._getNumMatches({"FORMAT": "ALL"}), self.totalRecs)


class MetaKeyTest(_WithSSATableTest):
# these are like CoreQueries except they exercise custom logic
	def testTOP(self):
		res = self.runService("s",
			_mkd(REQUEST="queryData", TOP="1"))
		self.assertEqual(len(res.getPrimaryTable()), 1)

	def testMAXREC(self):
		res = self.runService("s",
			_mkd(REQUEST="queryData", TOP="3", MAXREC="1"))
		self.assertEqual(len(res.getPrimaryTable()), 1)

	def testMTIMEInclusion(self):
		aMinuteAgo = datetime.datetime.utcnow()-datetime.timedelta(seconds=60)
		res = self.runService("s",
			_mkd(REQUEST="queryData", MTIME="%s/"%aMinuteAgo))
		self.assertEqual(len(res.getPrimaryTable()), 6)

	def testMTIMEExclusion(self):
		aMinuteAgo = datetime.datetime.utcnow()-datetime.timedelta(seconds=60)
		res = self.runService("s",
			_mkd(REQUEST="queryData", MTIME="/%s"%aMinuteAgo))
		self.assertEqual(len(res.getPrimaryTable()), 0)

	def testInsensitive(self):
		aMinuteAgo = datetime.datetime.utcnow()-datetime.timedelta(seconds=60)
		res = self.runService("s",
			utils.CaseSemisensitiveDict(
				{"rEQueST": ["queryData"], "mtime": ["/%s"%aMinuteAgo]}))
		self.assertEqual(len(res.getPrimaryTable()), 0)


class _RenderedSSAMetadata(testhelpers.TestResource):
	resources = [("ssatable", tresc.ssaTestTable)]

	def make(self, deps):
		request = trialhelpers.FakeRequest("",
			args={"REQUEST": "queryData", "FORMAT": "Metadata"})
		pg = vodal.SSAPRenderer(request, getRD().getById("s"))
		rendered = pg.renderSync(request)
		return rendered, testhelpers.getXMLTree(rendered, debug=False)


class SSAMetaResponseTest(testhelpers.VerboseTest):
	resources = [("meta", _RenderedSSAMetadata())]

	def testIsVOTable(self):
		self.assertTrue(b"<VOTABLE" in self.meta[0])
	
	def testSizeInput(self):
		matches = self.meta[1].xpath("//PARAM[@name='INPUT:SIZE']")
		par = testhelpers.pickSingle(matches)
		self.assertEqual(par[0].text,
			"Size of the region of interest around POS")

	def testStandardFieldPresent(self):
		self.assertTrue(re.search(b'<FIELD[^>]*name="accref"', self.meta[0]))

	def testCustomFieldPresent(self):
		self.assertTrue(re.search(b'<FIELD[^>]*name="excellence"', self.meta[0]))

	def testIntsAdapted(self):
		field = self.meta[1].xpath("//PARAM[@name='INPUT:excellence']")[0]
		self.assertEqual(field.get("datatype"), "char")
		self.assertEqual(field.get("arraysize"), "*")

	def testFloatsAdapted(self):
		field = self.meta[1].xpath("//PARAM[@name='INPUT:bogosity']")[0]
		self.assertEqual(field.get("datatype"), "char")
		self.assertEqual(field.get("arraysize"), "*")

	def testUtypeGiven(self):
		matches = self.meta[1].xpath("//FIELD[@utype='ssa:Curation.PublisherDID']")
		self.assertEqual(len(matches), 1)
	
	def testInstrumentDefined(self):
		matches = self.meta[1].xpath("//PARAM[@utype='ssa:DataID.Instrument']")
		self.assertEqual(matches[0].get("value"), "DaCHS test suite")
	
	def testMAXREC(self):
		matches = self.meta[1].xpath("//PARAM[@name='INPUT:maxrec']")
		self.assertEqual(len(matches), 1)
		self.assertEqual(matches[0].get("datatype"), "int")
	
	def testVERB(self):
		matches = self.meta[1].xpath("//PARAM[@name='INPUT:verb']")
		self.assertEqual(len(matches), 1)
		self.assertEqual(
			matches[0].xpath("VALUES/OPTION[@value='1']")[0].get("name"),
			"terse")


class CoreFailuresTest(_WithSSATableTest):
	def testBadRequestRejected(self):
		self.assertRaises(api.ValidationError, self.runService, "s",
			_mkd(REQUEST="folly"))

	def testBadBandRejected(self):
		self.assertRaises(api.ValidationError, self.runService, "s",
			_mkd(REQUEST="queryData", BAND="1/2/0.4"))

	def testBadCustomInputRejected(self):
		self.assertRaises(api.ValidationError, self.runService, "s",
			_mkd(REQUEST="queryData", excellence="banana"))

	def testSillyFrameRejected(self):
		self.assertRaisesWithMsg(api.ValidationError,
			"Field POS: Cannot match against coordinates given in EGOCENTRIC frame",
			self.runService,
			("s", _mkd(
				REQUEST="queryData", POS="0,0;EGOCENTRIC", SIZE="1")))

	def testMalformedSize(self):
		self.assertRaisesWithMsg(api.ValidationError,
			"Field SIZE: 'all' is not a valid literal for SIZE",
			self.runService,
			("s", _mkd(
				REQUEST="queryData", POS="0,0", SIZE="all")))


class _RenderedSSAResponse(testhelpers.TestResource):
	resources = [("ssatable", tresc.ssaTestTable)]

	def make(self, deps):
		request = trialhelpers.FakeRequest("",
				args={"REQUEST": "queryData", "TOP": "3",
					"MAXREC": "3", "FORMAT": "votable",
					"_DBOPTIONS_ORDER": "-ssa_targname"})
		pg = vodal.SSAPRenderer(request, getRD().getById("c"))
		rawVOT = pg.renderSync(request)
		return rawVOT, testhelpers.getXMLTree(rawVOT, debug=False)

_renderedSSAResponse = _RenderedSSAResponse()


class SSATableTest(testhelpers.VerboseTest):
	# tests for certain properties of rendered SSA table responses

	resources = [("docAndTree", _renderedSSAResponse)]

	def testOverflowFlagged(self):
		infoEl = self.docAndTree[1].xpath(
			"//RESOURCE/INFO[@name='QUERY_STATUS']")[0]
		self.assertEqual(infoEl.attrib["value"], "OVERFLOW")
		self.assertEqual(infoEl.text, None)

	def testOverflowWarning(self):
		infoEl = self.docAndTree[1].xpath(
			"//RESOURCE/INFO[@name='warning']")[0]
		self.assertTrue("The query limit was reached. Increase it to"
			" retrieve more matches. Note that unsorted truncated"
			in infoEl.get("value"))

	def testSSAUtype(self):
		table = self.docAndTree[1].find("RESOURCE/TABLE")
		self.assertTrue(table.find("FIELD").attrib["utype"].startswith("ssa:"))

	def testTimestampCast(self):
		fields = self.docAndTree[1].findall("RESOURCE/TABLE/FIELD")
		for field in fields:
			if field.attrib["name"]=="ssa_dateObs":
				self.assertEqual(field.attrib["xtype"], "mjd")
				self.assertEqual(field.attrib["datatype"], "double")
				break
	
	def testAccrefPresent(self):
		self.assertTrue(b"http://localhost:8080/getproduct" in self.docAndTree[0])

	def testEverythingExpanded(self):
		self.assertFalse(b"\\" in self.docAndTree[0])

	def testLocationMetadata(self):
		locationField = self.docAndTree[1].xpath(
			"//FIELD[@name='ssa_location']")[0]
		self.assertEqual(locationField.get("xtype"), "point")
		self.assertEqual(locationField.get("unit"), "deg")
		self.assertEqual(locationField.get("arraysize"), "2")

	def testPreviewColumn(self):
		previewField = self.docAndTree[1].xpath(
			"//FIELD[@ucd='meta.ref.url;meta.preview']")[0]
		previewIndex = list(f for f in previewField.getparent().iterchildren()
			if f.tag=='FIELD').index(previewField)
		self.assertEqual(self.docAndTree[1].xpath(
			"//TABLEDATA/TR[1]/TD[%s]"%(previewIndex+1))[0].text,
			"http://localhost:8080/getproduct/data/spec1.ssatest.vot?preview=True")

	def testDatalinkResourcesPresent(self):
		_, tree = self.docAndTree
		self.assertEqual(len(tree.xpath(
			"//RESOURCE[@utype='adhoc:service']")), 2)

	def testCOOSYS(self):
		coosysEls = self.docAndTree[1].xpath("RESOURCE/COOSYS")
		self.assertEqual(len(coosysEls), 1)

	def testTIMESYS(self):
		timesysEls = self.docAndTree[1].xpath("RESOURCE/TIMESYS")
		self.assertEqual(len(timesysEls), 1)
		timesysEl = timesysEls[0]
		self.assertEqual(timesysEl.get("refposition"), "BARYCENTER")
		self.assertEqual(timesysEl.get("timeorigin"), "2400000.5")
		self.assertEqual(timesysEl.get("timescale"), "TDB")


class _DLMetaResource(testhelpers.TestResource):
	resources = [("docAndTree", _renderedSSAResponse)]

	def make(self, deps):
		return deps["docAndTree"][1].xpath(
			"RESOURCE[PARAM[@name='standardID']/@value="
				"'ivo://ivoa.net/std/datalink#links-1.1']")[0]


class EmbeddedDatalinkMetaTest(testhelpers.VerboseTest):

	resources = [("resource", _DLMetaResource()),
		("docAndTree", _renderedSSAResponse)]

	def testUtype(self):
		self.assertEqual(self.resource.get("utype"), "adhoc:service")

	def testAccessURL(self):
		accParam = self.resource.xpath("PARAM[@name='accessURL']")[0]
		self.assertTrue(accParam.get("value").endswith(
			"/data/ssatest/dl/dlmeta"))
		self.assertEqual(accParam.get("arraysize"), "*")
		self.assertEqual(accParam.get("datatype"), "char")

	def testIdParam(self):
		idParam = self.resource.xpath("GROUP[@name='inputParams']/"
			"PARAM[@name='ID']")[0]
		self.assertEqual(idParam.get("ucd"), "meta.id;meta.main")
		ref = idParam.get("ref")

		dest = self.docAndTree[1].xpath("//FIELD[@ID='%s']"%ref)
		self.assertEqual(len(dest), 1)
		self.assertEqual(dest[0].get("name"), "ssa_pubDID")

		
class _DLGetResource(testhelpers.TestResource):
	resources = [("docAndTree", _renderedSSAResponse)]

	def make(self, deps):
		return deps["docAndTree"][1].xpath(
			"RESOURCE[PARAM[@name='standardID']/@value="
				"'ivo://ivoa.net/std/soda#sync-1.0']")[0]


class EmbeddedDatalinkGetTest(testhelpers.VerboseTest):

	resources = [("resource", _DLGetResource()),
		("docAndTree", _renderedSSAResponse)]

	def testUtype(self):
		self.assertEqual(self.resource.get("utype"), "adhoc:service")

	def testAccessURL(self):
		accParam = self.resource.xpath("PARAM[@name='accessURL']")[0]
		self.assertTrue(accParam.get("value").endswith(
			"/data/ssatest/dl/dlget"))
		self.assertEqual(accParam.get("arraysize"), "*")
		self.assertEqual(accParam.get("datatype"), "char")

	def testBANDLimitsDeclared(self):
		bandParam = self.resource.xpath("GROUP[@name='inputParams']/"
			"PARAM[@name='BAND']")[0]
		self.assertAlmostEqual(
			float(bandParam.xpath("VALUES/MIN")[0].get("value")),
			4e-07)
		self.assertAlmostEqual(
			float(bandParam.xpath("VALUES/MAX")[0].get("value")),
			6e-07)

	def testFORMATOptionsDeclared(self):
		formatParam = self.resource.xpath("GROUP[@name='inputParams']/"
			"PARAM[@name='FORMAT']")[0]
		options = set(formatParam.xpath("VALUES/OPTION/@value"))
		self.assertTrue("text/plain" in options)
		self.assertTrue("application/fits" in options)

	def testIdParam(self):
		idParam = self.resource.xpath("GROUP[@name='inputParams']/"
			"PARAM[@name='ID']")[0]
		self.assertEqual(idParam.get("ucd"), "meta.id;meta.main")
		ref = idParam.get("ref")

		dest = self.docAndTree[1].xpath("//FIELD[@ID='%s']"%ref)
		self.assertEqual(len(dest), 1)
		self.assertEqual(dest[0].get("name"), "ssa_pubDID")

	def testLeftOverEnumeration(self):
		self.assertEqual(set(el.get("value") for el in
				self.resource.xpath("GROUP/PARAM[@name='FLUXCALIB']/VALUES/OPTION")),
			set(['UNCALIBRATED', 'RELATIVE']))

	def testInSSABlockCheck(self):
		sentinelDesc = self.resource.xpath("GROUP[@name='inputParams']/"
			"PARAM[@name='bogus']/DESCRIPTION")[0]
		self.assertEqual(sentinelDesc.text, "Sentinel for a skipped link")


class _FakeRAccref(products.RAccref):
	"""a RAccref that lets you manually provide a productsRow.
	"""
	def setProductsRow(self, val):
		defaults = {
			"embargo": None,
			"mime": "application/x-votable+xml",}
		defaults.update(val)
		self._productsRowCache = defaults


class _RenderedSDMResponse(testhelpers.TestResource):
	resources = [("ssatable", tresc.ssaTestTable)]

	def make(self, deps):
		res = trialhelpers.runSvcWith(getRD().getById("dl"), "dlget",
			_mkd(ID="ivo://test.inv/test1",
				FORMAT="application/x-votable+xml;serialization=tabledata"))
		_, rawVOT = res
		return rawVOT, testhelpers.getXMLTree(rawVOT, debug=False)


class SDMTableTest(testhelpers.VerboseTest):
# tests for core and rendering of Spectral Data Model VOTables.
	resources = [("stringAndTree", _RenderedSDMResponse())]

	def _getUniqueByXPath(self, xpath, root=None):
		if root is None:
			root = self.stringAndTree[1]
		resSet = root.xpath(xpath)
		self.assertEqual(len(resSet), 1)
		return resSet[0]

	def testParameterSet(self):
		res = self._getUniqueByXPath("//PARAM[@name='ssa_pubDID']")
		self.assertEqual(res.get('value'), 'ivo://test.inv/test1')

	def testSpecGroupsPresent(self):
		# assertion: exists and is unique
		self._getUniqueByXPath("//GROUP[@utype='spec:Spectrum.Target']")
		ref = self._getUniqueByXPath(
			'//PARAMref[@utype="spec:Spectrum.Target.Name"]')
		self.assertFalse(ref.get("ref") is None)
	
	def testReferentialIntegrity(self):
		#open("zw.vot", "w").write(self.stringAndTree[0])
		tree = self.stringAndTree[1]
		knownIds = set()
		for element in tree.xpath("//*[@ID]"):
			knownIds.add(element.get("ID"))
		for element in tree.xpath("//*[@ref]"):
			self.assertTrue(element.get("ref") in knownIds,
				"%s is referred to but no element with this id present"%
					element.get("ref"))

	def testDataPresent(self):
		tree = self.stringAndTree[1]
		firstRow = tree.xpath("//TR")[1]
		self.assertEqual(
			[el.text for el in firstRow.xpath("TD")],
			["1755.0", "1753.0"])

	def testContainerUtypes(self):
		tree = self.stringAndTree[1]
		votRes = tree.xpath("//RESOURCE")[0]
		self.assertEqual(votRes.get("utype"), "spec:Spectrum")
		table = votRes.xpath("//TABLE")[0]
		self.assertEqual(table.get("utype"), "spec:Spectrum")

	def testAccrefMapped(self):
		# the product link is made in a hack in SDMCore.
		tree = self.stringAndTree[1]
		p = tree.xpath("//PARAM[@name='accref']")[0]
		self.assertTrue(p.get("value").startswith("http"))

	def testDescriptionOverridden(self):
		tree = self.stringAndTree[1]
		self.assertEqual(
			tree.xpath("//FIELD[@name='spectral']/DESCRIPTION")[0].text,
			"Wavelength")
		self.assertEqual(
			tree.xpath("//FIELD[@name='flux']/DESCRIPTION")[0].text,
			"Stellar surface flux density")

	def testSpectralUnitUsed(self):
		tree = self.stringAndTree[1]
		self.assertEqual(
			tree.xpath("//FIELD[@name='spectral']")[0].get("unit"),
			"Angstrom")

	def testSSAMetadataUnchanged(self):
		# FIXME: I guess SDM *should* adapt ssa metadata to whatever is
		# used in the spectra themselves.  Right now, the metadata units
		# are fixed by SSA, so DaCHS must not  do any conversions here
		p = self.stringAndTree[1].xpath("//PARAM[@name='ssa_specstart']")[0]
		self.assertEqual(p.get("unit"), "m")
		self.assertAlmostEqual(float(p.get("value")), 4e-07)


class _RenderedSDMFITSResponse(testhelpers.TestResource):
	resources = [("ssatable", tresc.ssaTestTable)]

	def make(self, deps):
		sdmData = sdm.makeSDMDataForPUBDID('ivo://test.inv/test1',
			getRD().getById("hcdtest"),
			getRD().getById("datamaker"))
		fitsBytes = sdm.formatSDMData(sdmData, "application/fits")[-1]
		self.tempFile = tempfile.NamedTemporaryFile()
		self.tempFile.write(fitsBytes)
		self.tempFile.flush()
		return pyfits.open(self.tempFile.name)
	
	def clean(self, res):
		self.tempFile.close()


class SDMFITSTest(testhelpers.VerboseTest):
	resources = [("hdus", _RenderedSDMFITSResponse())]

	def testPrimaryHDU(self):
		self.hdus[0].header.get("EXTEND", True)
		self.hdus[0].header.get("NAXIS", 0)

	def testSimpleUtypeTranslated(self):
		self.assertEqual(self.hdus[1].header.get("OBJECT"), "big fart nebula")
	
	def testParTypesPreserved(self):
		self.assertAlmostEqual(self.hdus[1].header.get("DEC"), 15.2)
	
	def testColumnUtype(self):
		hdr = self.hdus[1].header
		self.assertEqual(hdr["TUTYP1"], 'spec:spectrum.data.spectralaxis.value')
		self.assertEqual(hdr["TUTYP2"], 'spec:spectrum.data.fluxaxis.value')

	def testColumnComment(self):
		hdr = self.hdus[1].header
		self.assertEqual(hdr.comments["TTYPE1"], "Wavelength")

	def testValues(self):
		wl, flux = self.hdus[1].data[1]
		self.assertAlmostEqual(wl, 1755.)
		self.assertAlmostEqual(flux, 1753.)

	def testLimits(self):
		hdr = self.hdus[1].header
		self.assertAlmostEqual(hdr["TDMIN1"], 4e-7)
		self.assertEqual(hdr.cards["TDMIN1"].comment, "[m]")

	def testCalibration(self):
		hdr = self.hdus[1].header
		self.assertEqual(hdr["FLUX_CAL"], "UNCALIBRATED")

	def testBandwidthUnit(self):
		hdr = self.hdus[1].header
		card = hdr.cards["SPEC_BW"]
		self.assertAlmostEqual(card.value, 1e-7)
		self.assertEqual(card.comment, "[m]")


class MixcTableTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testColumns(self):
		table = getRD().getById("mixctest")
		col = table.getColumnByName("ssa_fluxcalib")
		for attName, expected in [
			("utype", "ssa:Char.FluxAxis.Calibration"),
			("verbLevel", 15)]:
			self.assertEqual(getattr(col, attName), expected)
		col = table.getColumnByName("ssa_spectStatError")
		for attName, expected in [
			("utype", "ssa:Char.SpectralAxis.Accuracy.StatError"),
			("unit", "m"),
			("verbLevel", 15)]:
			self.assertEqual(getattr(col, attName), expected)
		col = table.getColumnByName("ssa_fluxStatError")
		for attName, expected in [
			("utype", "ssa:Char.FluxAxis.Accuracy.StatError"),
			("unit", "Jy"),
			("verbLevel", 15)]:
			self.assertEqual(getattr(col, attName), expected)

	def testUndefinedOnPar(self):
		col = getRD().getById("mixctest").getColumnByName("ssa_reference")
		self.assertEqual(col.unit, None)

	def testSkippedColumnsGone(self):
		td = getRD().getById("mixctest")
		for name in ["ssa_timeSI", "ssa_spaceCalib"]:
			self.assertRaises(base.NotFoundError, td.getColumnByName, (name))

	def testFilling(self):
		rsc.makeData(getRD().getById("test_mixc"), connection=self.conn,
			runCommit=False)
		try:
			rows = list(self.conn.queryToDicts(
				"select ssa_dstitle, ssa_instrument, ssa_pubdid,"
				" ssa_reference, ssa_publisher from test.mixctest"))
			self.assertEqual(len(rows), 3)
			id = rows[0]["ssa_pubdid"].split("/")[-1]
			self.assertEqual(rows[0]["ssa_publisher"], "Your organisation's name")
			self.assertEqual(rows[0]["ssa_instrument"], "Bruce Astrograph")
			self.assertEqual(rows[0]["ssa_reference"], "Paper on "+id)
			self.assertEqual(rows[0]["ssa_dstitle"], "junk from "+id)
		finally:
			self.conn.rollback()

	def testParams(self):
		table = getRD().getById("mixctest")
		param = table.getParamByName("ssa_fluxunit")
		self.assertEqual(param.value, "Jy")


class _SSAViewTable(testhelpers.TestResource):
	def make(self, deps):
		rd = base.parseFromString(api.RD,
			"""<resource schema="test"><table id="ssaview" onDisk="true">
				<mixin
					sourcetable="data/ssatest#hcdtest"
					ssa_location="ssa_location"
					ssa_aperture="NULL"
					ssa_specres="NULL"
					ssa_bandpass="'Radio'"
					ssa_collection="'fakes'"
					ssa_creationtype="'archival'"
					ssa_dateObs="NULL"
					ssa_dstitle="NULL"
					ssa_fluxcalib="'QUATSCH'"
					ssa_length="NULL"
					ssa_spectralunit="'furlongs'"
					timescale="UTC"
					refposition="TOPOCENTER"
					>//ssap#view</mixin>
			</table></resource>""")
		return rd.getById("ssaview")


class ViewTest(testhelpers.VerboseTest):
	resources = [("td", _SSAViewTable())]

	def testBasicSchema(self):
		colNames = [c.name for c in self.td.columns]
		self.assertEqual(colNames[:8], ['accref', 'owner', 'embargo',
			'mime', 'accsize', 'ssa_dstitle', 'ssa_creatorDID', 'ssa_pubDID'])
		self.assertEqual(len(colNames), 54)

	def testIndexesDeclared(self):
		indexedCols = set(itertools.chain(*[i.columns for i in self.td.indices]))
		self.assertEqual(indexedCols, set([
			'ssa_pubDID', 'ssa_targname', 'accref', 'ssa_location']))

	def testTIMESYS(self):
		for ann in self.td.iterAnnotationsOfType("votable:Coords"):
			if "time" in ann:
				break
		else:
			raise AssertionError("No time annotation in ssap view")

		self.assertEqual(ann["time"]["frame"]["refPosition"], "TOPOCENTER")
		self.assertEqual(ann["time"]["frame"]["time0"], "2400000.5")
		self.assertEqual(ann["time"]["frame"]["timescale"], "UTC")

	def testColumnCopy(self):
		rd = base.parseFromString(api.RD,
			"""<resource schema="test">
				<table id="basic" onDisk="True">
					<column name="ssa_location" description="Fake position"
						type="spoint"/>
					<column name="mojo"/>
				</table>
				<table id="ssaview">
				<mixin copiedcolumns="*"
					sourcetable="basic"
					ssa_spectralunit="Angstrom"
					ssa_fluxcalib="'ABSOLUTE'"
					ssa_location="'ignored junk'"
					>//ssap#view</mixin>
			</table></resource>""")

		resultTable = rd.getById("ssaview")

		colNames = [c.name for c in resultTable.columns]
		self.assertEqual(len(colNames), 55)
		self.assertTrue("mojo" in colNames)

		self.assertEqual(resultTable.getColumnByName("ssa_location").description,
			"Fake position")

		self.assertTrue("mojo::real" in resultTable.viewStatement)
		self.assertTrue("ssa_location::spoint," in resultTable.viewStatement)
		self.assertTrue("ssa_region" not in resultTable.viewStatement)

	def testPlainLocationAndExtra(self):
		rd = base.parseFromString(api.RD,
			"""<resource schema="test">
				<table id="basic" onDisk="True">
					<mixin>//ssap#plainlocation</mixin>
					<column name="kalotte" type="integer"/>
					<column name="seite" type="text"/>
				</table>
				<table id="ssaview">
				<mixin copiedcolumns="ka* , seite"
					sourcetable="basic"
					ssa_spectralunit="Angstrom"
					ssa_fluxcalib="'ABSOLUTE'"
					customcode="
						,54+20+kalotte as somemagic,
						'gnatz' as krass"
					>//ssap#view</mixin>
					<column name="somemagic" type="integer"/>
					<column name="krass" type="text"/>
			</table></resource>""")

		resultTable = rd.getById("ssaview")
		colNames = [c.name for c in resultTable.columns]
		self.assertEqual(len(colNames), 58)
		self.assertEqual(colNames[-4:],
			['kalotte', 'seite', 'somemagic', 'krass'])

		self.assertTrue("54+20+kalotte as somemagic" in resultTable.viewStatement)
		self.assertTrue("kalotte::integer," in resultTable.viewStatement)
		self.assertTrue("ssa_location::spoint," in resultTable.viewStatement)


class ExtensionTest(testhelpers.VerboseTest):
	resources = [("ssatable", tresc.ssaTestTable)]

	def testGetTargetName(self):
		request = trialhelpers.FakeRequest("",
				args={"REQUEST": "gettargetnames"})
		pg = vodal.SSAPRenderer(request, getRD().getById("c"))
		rawVOT = pg.renderSync(request)
		tree = testhelpers.getXMLTree(rawVOT, debug=False)
		self.assertEqual(
			set(e.text for e in tree.xpath("//TD")),
			set(["booger star", "rat hole in the yard", "big fart nebula"]))
		self.assertEqual(tree.xpath("//FIELD[@name='ssa_targname']")[0]
			.get("ucd"), "meta.id;src")


if __name__=="__main__":
	from gavo.formats import texttable, votablewrite #noflake: for registration
	base.DEBUG = True
	testhelpers.main(ExtensionTest)
