"""
Some unit tests not yet fitting anywhere else.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import http.client
import io
import os
import pathlib
import re
import shutil
import sys
import tempfile
import unittest
from urllib import parse

import numpy

from gavo.helpers import testhelpers

from gavo import api
from gavo import base
from gavo import rscdesc
from gavo import votable
from gavo import stc
from gavo import utils
from gavo.user import upgrade
from gavo.helpers import filestuff
from gavo.helpers import processing
from gavo.utils import pyfits
from gavo.votable import tapquery

import tresc


class RenamerDryTest(unittest.TestCase):
	"""tests for some aspects of the file renamer without touching the file system.
	"""
	def testSerialization(self):
		"""tests for correct serialization of clobbering renames.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', '2': '3', '1': '2'}
		self.assertEqual(f.makeRenameProc(fileMap),
			[('2', '3'), ('1', '2'), ('b', 'c'), ('a', 'b')])
	
	def testCycleDetection(self):
		"""tests for cycle detection in renaming recipies.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', 'c': 'a'}
		self.assertRaises(filestuff.Error, f.makeRenameProc, fileMap)


class RenamerWetTest(unittest.TestCase):
	"""tests for behaviour of the file renamer on the file system.
	"""
	def setUp(self):
		def touch(name):
			f = open(name, "w")
			f.close()
		self.testDir = tempfile.mkdtemp("testrun")
		for fName in ["a.fits", "a.txt", "b.txt", "b.jpeg", "foo"]:
			touch(os.path.join(self.testDir, fName))

	def tearDown(self):
		shutil.rmtree(self.testDir, onerror=lambda exc: None)

	def testOperation(self):
		"""tests an almost-realistic application
		"""
		f = filestuff.FileRenamer.loadFromFile(
			io.StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		found = set(os.listdir(self.testDir))
		expected = set(["b.fits", "b.txt", "c.txt", "c.jpeg", "bar"])
		self.assertEqual(found, expected)
	
	def testNoClobber(self):
		"""tests for effects of repeated application.
		"""
		f = filestuff.FileRenamer.loadFromFile(
			io.StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		self.assertRaises(filestuff.Error, f.renameInPath, self.testDir)


class TimeCalcTest(testhelpers.VerboseTest):
	"""tests for time transformations.
	"""
	def testJYears(self):
		self.assertEqual(stc.jYearToDateTime(1991.25),
			datetime.datetime(1991, 0o4, 0o2, 13, 30, 00))
		self.assertEqual(stc.jYearToDateTime(2005.0),
			datetime.datetime(2004, 12, 31, 18, 0))
	
	def testRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(2010+yr/1000., stc.dateTimeToJYear(
				stc.jYearToDateTime(2010+yr/1000.)), 7,
				"Botched %f"%(2010+yr/1000.))

	def testBYears(self):
		self.assertEqual(stc.bYearToDateTime(1950.0),
			datetime.datetime(1949, 12, 31, 22, 9, 46, 861900))
	
	def testBRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(1950+yr/1000., stc.dateTimeToBYear(
				stc.bYearToDateTime(1950+yr/1000.)), 7,
				"Botched %f"%(1950+yr/1000.))

	def testBesselLieske(self):
		"""check examples from Lieske, A&A 73, 282.
		"""
		for bessel, julian in [
				(1899.999142, 1900),
				(1900., 1900.000858),
				(1950., 1949.999790),
				(1950.000210, 1950.0),
				(2000.0, 1999.998722),
				(2000.001278, 2000.0)]:
			self.assertAlmostEqual(stc.dateTimeToJYear(stc.bYearToDateTime(bessel)),
				julian, places=5)

	def testJDDateTime(self):
		self.assertEqual(utils.formatISODT(stc.jdnToDateTime(2458383.5)),
			"2018-09-22T00:00:00Z")


class FITSProcessorTest(testhelpers.VerboseTest):
	"""tests for some aspects of helper file processors.

	Unfortunately, sequencing is important here, so we do it all in
	one test.  I guess one should rethink things here, but for now let's
	keep things simple.
	"""
	_rdText = """<resource schema="filetest"><data id="import">
		<sources pattern="*.fits"/><fitsProdGrammar/>
		</data></resource>"""

	def _writeFITS(self, destPath, seed):
		hdu = pyfits.PrimaryHDU(numpy.zeros((2,seed+1), 'i2'))
		hdu.header.set("SEED", seed, "initial number")
		hdu.header.set("WEIRD", "W"*seed)
		hdu.header.set("RECIP", 1./(1+seed))
		hdu.writeto(destPath)

	def setUp(self):
		self.resdir = os.path.join(base.getConfig("tempDir"), "filetest")
		self.origInputs = base.getConfig("inputsDir")
		base.setConfig("inputsDir", base.getConfig("tempDir"))
		if os.path.exists(self.resdir): # Leftover from previous run?
			return
		os.mkdir(self.resdir)
		for i in range(10):
			self._writeFITS(os.path.join(self.resdir, "src%d.fits"%i), i)
		f = open(os.path.join(self.resdir, "filetest.rd"), "w")
		f.write(self._rdText)
		f.close()

	def tearDown(self):
		base.setConfig("inputsDir", self.origInputs)
		shutil.rmtree(self.resdir, True)

	class SimpleProcessor(processing.HeaderProcessor):
		def __init__(self, *args, **kwargs):
			processing.HeaderProcessor.__init__(self, *args, **kwargs)
			self.headersBuilt = 0

		def _isProcessed(self, srcName):
			return "SQUARE" in self.getPrimaryHeader(srcName)

		def _getHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**2)
			self.headersBuilt += 1
			return hdr

	def _getHeader(self, srcName):
		hdus = pyfits.open(os.path.join(self.resdir, srcName))
		hdr = hdus[0].header
		hdus.close()
		return hdr

	def _testPlainRun(self):
		# procmain reads argv, don't confuse it
		sys.argv = ["test", "--bail"]
		# Normal run, no headers present yet
		proc, stdout, errs = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(errs, "")
		self.assertEqual(stdout.split('\r')[-1].strip(),
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 10)
		self.assertTrue(os.path.exists(
			os.path.join(self.resdir, "src9.fits.hdr")))
		self.assertTrue("SQUARE" in self._getHeader("src9.fits.hdr"))
		# we don't run with applyHeaders here
		self.assertFalse("SQUARE" in self._getHeader("src9.fits"))

	def _testRespectCaches(self):
		"""tests that no processing is done when cached headers are there.

		This needs to run after _testPlainRun.
		"""
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(),
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
	
	def _testNoCompute(self):
		"""tests that no computations take place with --no-compute.
		"""
		sys.argv = ["misctest.py", "--no-compute"]
		os.unlink(os.path.join(self.resdir, "src4.fits.hdr"))
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 0)

	def _testRecompute(self):
		"""tests that missing headers are recomputed.

		This needs to run before _testApplyCaches and after _testNoCompute.
		"""
		sys.argv = ["misctest.py"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(),
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 1)

	def _testApplyCaches(self):
		"""tests the application of headers to sources.

		This needs to run after _testPlainRun
		"""
		sys.argv = ["misctest.py", "--apply"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(),
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
		self.assertTrue("SQUARE" in self._getHeader("src9.fits"))
		# see if data survived
		hdus = pyfits.open(os.path.join(self.resdir, "src9.fits"))
		na = hdus[0].data
		self.assertEqual(na.shape, (2, 10))
	
	def _testForcedRecompute(self):
		"""tests for working --reprocess.
		"""
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 10)

	def _testBugfix(self):
		"""tests for working --reprocess --apply.

		This must run last since we're monkeypatching SimpleProcessor.
		"""
		def newGetHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**3)
			self.headersBuilt += 1
			return hdr
		sys.argv = ["misctest.py", "--reprocess", "--apply"]
		self.SimpleProcessor._getHeader = newGetHeader
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(self._getHeader("src6.fits.hdr")["SQUARE"], 216)

	def testAll(self):
		self._testPlainRun()
		self._testRespectCaches()
		self._testNoCompute()
		self._testRecompute()
		self._testForcedRecompute()
		self._testApplyCaches()
		self._testForcedRecompute()
		self._testBugfix()


class SpectrumPreviewTest(testhelpers.VerboseTest):
	def testBuildSpectrum(self):
		destPath = os.path.join(
			api.getConfig("inputsDir"),
			"ssa-nopreviews",
			"fake-preview")
		if os.path.exists(destPath):
			os.unlink(destPath)
			
		class PreviewMaker(api.SpectralPreviewMaker):
			sdmId = "datamaker"

			def iterIdentifiers(self):
				yield "this is not actually an identifer, just unit test stuff"

			def getPreviewPath(self, accref):
				return destPath

		class opts:
			nParallel = 1
			doReport = False
			force = True
			bailOnError = True
			requireFrag = None

		proc = PreviewMaker(opts, api.resolveCrossId("data/ssatest#test_import"))
		rtval, stdout, stderr = testhelpers.captureOutput(proc.processAll)

		self.assertEqual(rtval, (1, 0))
		self.assertEqual(stderr, "")
		
		with open(destPath, "rb") as f:
			prevData = f.read()

		self.assertTrue(b"IDATx\x9c\xed\xda\xbb\r\x83@\x10\x84" in prevData,
			"Generated spectrum preview doesn't contain expected data")


@contextlib.contextmanager
def _fakeHTTPLib(respData="", respStatus=200,
		mime="application/x-votable", exception=None):
	"""runs a test with a fake httplib connection maker.

	This is for TapquerySyncTest and similar.
	"""
	class FakeResult(object):
		status = respStatus

		def getheader(self, key):
			if key.lower()=='content-type':
				return mime
			else:
				self.fail()

		def read(self):
			return respData

	class FakeInfo(object):
		pass

	class FakeConnection(object):
		def __init__(self, *args, **kwargs):
			pass

		def request(self, method, path, data, headers):
			FakeInfo.lastData = data
			if exception is not None:
				raise exception

		def getresponse(self, *args, **kwargs):
			return FakeResult()

		def close(self):
			pass
	
	origConn = http.client.HTTPConnection
	http.client.HTTPConnection = FakeConnection
	try:
		yield FakeInfo
	finally:
		http.client.HTTPConnection = origConn


class TapquerySyncTest(testhelpers.VerboseTest):
# Tests for the tapquery sync object; since TAP queries are expensive,
# we only test things that don't actually run a query.  For more extensive
# exercising, see the taptest RD at the GAVO DC.
	endpoint = "http://dachstest"

	def testNoResult(self):
		job = votable.ADQLSyncJob(self.endpoint,
			"select * from tap_schema.tables")
		self.assertRaisesWithMsg(tapquery.Error,
			"No result in so far",
			job.openResult,
			())

	def testWrongStatus(self):
		with _fakeHTTPLib(respData=b"oops", respStatus=404):
			job = votable.ADQLSyncJob(self.endpoint,
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(), "oops")

	def testHTTPError(self):
		import socket
		with _fakeHTTPLib(respData="oops", exception=socket.error("timeout")):
			job = votable.ADQLSyncJob(self.endpoint,
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.NetworkError, job.start)
			self.assertEqual(job.getErrorFromServer(),
				'Problem connecting to dachstest (timeout)')

	def testTAPError(self):
		with _fakeHTTPLib(respData="""<VOTABLE version="1.2" xmlns:vot="http://www.ivoa.net/xml/VOTable/v1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ivoa.net/xml/VOTable/v1.2 http://vo.ari.uni-heidelberg.de/docs/schemata/VOTable-1.2.xsd"><RESOURCE type="results"><INFO name="QUERY_STATUS" value="ERROR">Could not parse your query: Expected "SELECT" (at char 0), (line:1, col:1)</INFO></RESOURCE></VOTABLE>""", respStatus=400):
			job = votable.ADQLSyncJob(self.endpoint,
				"selct * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(),
				'Could not parse your query: Expected "SELECT" (at char 0),'
				' (line:1, col:1)')

	def testConstructionParameters(self):
		with _fakeHTTPLib(respData=b"ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint,
				"select * from tap_schema.tables", userParams={"MAXREC": 0})
			job.start()
			self.assertEqual(parse.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])
		
	def testLaterParameters(self):
		with _fakeHTTPLib(respData=b"ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint,
				"select * from tap_schema.tables")
			job.setParameter("MAXREC", 0)
			job.start()
			self.assertEqual(parse.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])


class KVLParseTest(testhelpers.VerboseTest):
# Tests for our key-value line format (as in postgres)
	def testNoQuote(self):
		self.assertEqual(utils.parseKVLine("anz=29"), {"anz": "29"})

	def testWhitespaceAroundEqual(self):
		self.assertEqual(utils.parseKVLine("a =29 bo= orz Unt = Lopt"),
			{"a": "29", "bo": "orz", "Unt": "Lopt"})

	def testQuotedString(self):
		self.assertEqual(utils.parseKVLine(
			"simp='abc' a ='29' bo= 'orz' Unt = 'Lopt'"),
			{"simp": "abc", "a": "29", "bo": "orz", "Unt": "Lopt"})

	def testWithBlanks(self):
		self.assertEqual(utils.parseKVLine(
			"name='Virtual Astrophysical' a=' 29'"),
			{"name": 'Virtual Astrophysical', "a": ' 29'})

	def testEscaping(self):
		self.assertEqual(utils.parseKVLine(
			r"form='f\'(x) = 2x^3' escChar='\\'"),
			{"form": "f'(x) = 2x^3", "escChar": '\\'})

	def testEmpty(self):
		self.assertEqual(utils.parseKVLine(
			"kram='' prokto=logic"),
			{"kram": "", "prokto": 'logic'})

	def testBadKey(self):
		self.assertRaisesWithMsg(utils.ParseException,
			utils.EqualingRE(r"Expected Keyword, found '7(ana)?'  \(at char 0\), \(line:1, col:1\)"),
			utils.parseKVLine,
			("7ana=kram",))

	def testMissingEqual(self):
		self.assertRaisesWithMsg(utils.ParseException,
			utils.EqualingRE('Expected [\'"]=[\'"], found end of text  \\(at char 7\\), \\(line:1, col:8\\)'),
			utils.parseKVLine,
			("yvakram",))

	def testBadValue(self):
		self.assertRaisesWithMsg(utils.ParseException,
			"Expected end of text, found \"'\"  (at char 7), (line:1, col:8)",
			utils.parseKVLine,
			("borken='novalue",))

	def testTooManyEquals(self):
		self.assertRaisesWithMsg(utils.ParseException,
			'Expected end of text, found \'=\'  (at char 14), (line:1, col:15)',
			utils.parseKVLine,
			("borken=novalue=ab",))


class KVLMakeTest(testhelpers.VerboseTest):
	def testWithKeywords(self):
		self.assertEqual(utils.makeKVLine({"ab": "cd", "honk": "foo"}),
			"ab=cd honk=foo")
	
	def testWithWeird(self):
		self.assertEqual(utils.makeKVLine({"ab": "c d", "honk": "foo=?"}),
			"ab='c d' honk='foo=?'")
	
	def testWithEscaping(self):
		self.assertEqual(
			utils.makeKVLine({"form": "f'(x) = 2x^3", "escChar": '\\'}),
			"escChar='\\\\' form='f\\'(x) = 2x^3'")
	
	def testFailsWithBadKey(self):
		self.assertRaisesWithMsg(ValueError,
			"'a form' not allowed as a key in key-value lines",
			utils.makeKVLine,
			({"a form": "f'(x) = 2x^3"},))


import grp
from gavo.base import osinter

class OSInterTest(testhelpers.VerboseTest):
	def testMakeSharedDir(self):
		path = os.path.join(base.getConfig("inputsDir"), "_dir_form_unit_test_")
		try:
			osinter.makeSharedDir(path, writable=True)
			stats = os.stat(path)
			self.assertEqual(stats.st_mode&0o060, 0o60)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")

			os.chown(path, -1, os.getgid())
			osinter.makeSharedDir(path, writable=True)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")
		finally:
			os.rmdir(path)

	def testSwitchHTTP(self):
		self.assertEqual(
			osinter.switchProtocol("http://localhost:8080/bar/romp?hook#dork"),
			"https://localhost/bar/romp?hook#dork")

	def testSwitchHTTPS(self):
		self.assertEqual(
			osinter.switchProtocol("https://localhost/bar/romp?hook#dork"),
			"http://localhost:8080/bar/romp?hook#dork")

	def testSwitchOtherURL(self):
		self.assertRaises(ValueError,
			osinter.switchProtocol,
			"http://ivoa.net/documents")

	def testMailFormat(self):
		res = osinter.formatMail("""From: "Foo Bar\xdf" <foo@bar.com>
To: gnubbel@somewhere.org
Subject: Test Mail
X-Testing: Yes

This is normal text with shitty characters: '\xdf\xe4i\xdf\xe4'.

Send it, anyway.

Cheers,

       Foo.
""")
		self.assertTrue(isinstance(res, str))
		self.assertTrue("MIME-Version: 1.0" in res)
		self.assertTrue(" characters: '=C3=9F=" in res)
		self.assertTrue("From: =?utf-8?q?=22Foo_Bar=C3=9F=2" in res)
		self.assertTrue("X-Testing: Yes" in res)
		self.assertTrue("Subject: Test Mail" in res)
		self.assertTrue("To: gnubbel@somewhere.org" in res)
		self.assertTrue(re.search("Date: .*GMT", res))

	def testIvoidExternal(self):
		self.assertEqual(
			base.makeLinkForIvoid("ivo://leibniz-kis/chrotel/q/epn_core"),
			"https://dc.g-vo.org/LP/ivo://leibniz-kis/chrotel/q/epn_core")
	
	def testIvoidMalformed(self):
		self.assertRaisesWithMsg(
			ValueError,
			"Not an ivoid: ''",
			base.makeLinkForIvoid,
			("",))
	
	def testIvoidInternalNonResolving(self):
		self.assertEqual(
			base.makeLinkForIvoid("ivo://x-testing/toss/q/line_tap"),
			"https://dc.g-vo.org/LP/ivo://x-testing/toss/q/line_tap")

	def testIvoidInternalResolving(self):
		self.assertEqual(
			base.makeLinkForIvoid("ivo://x-testing/data/cores/dl"),
			"http://localhost:8080/data/cores/dl/info")


import lxml
from gavo.helpers import testtricks

VALID_OAI = """<?xml-stylesheet href='/static/xsl/oai.xsl' type='text/xsl'?><oai:OAI-PMH xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/ http://vo.ari.uni-heidelberg.de/docs/schemata/OAI-PMH.xsd"><oai:responseDate>2014-07-21T15:13:09Z</oai:responseDate><oai:request verb="ListSets"/><oai:ListSets><oai:set><oai:setSpec>ivo_managed</oai:setSpec><oai:setName>ivo_managed</oai:setName></oai:set></oai:ListSets></oai:OAI-PMH>"""

class ValidatorTest(testhelpers.VerboseTest):
	def testSimpleValidator(self):
		val = testtricks.getJointValidator(["oai_dc.xsd", "OAI-PMH.xsd"])
		oaiTree = lxml.etree.fromstring(VALID_OAI)
		val.assertValid(oaiTree)

		lxml.etree.SubElement(oaiTree[2][0], "p")
		self.assertFalse(val(oaiTree))


class PgValidatorTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testBasicAcceptance(self):
		base.sqltypeToPgValidator("integer")("int4")
	
	def testBadRejecting(self):
		self.assertRaisesWithMsg(base.ConversionError,
			"No Postgres pg_types validators type for float",
			base.sqltypeToPgValidator,
			("float",))

	def testBasicRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"int4 is not compatible with a float column",
			base.sqltypeToPgValidator("real"),
			("int4",))

	def testArrayAcceptance(self):
		base.sqltypeToPgValidator("real[]")("_float8")

	def testNondbRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"Column with a non-db type file mapped to db type wurst",
			base.sqltypeToPgValidator("file"),
			("wurst",))

	def testPgSphereOk(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="test"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		table = api.TableForDef(rd.tables[0], connection=self.conn, create=True)
		try:
			table.ensureOnDiskMatches()  # raises DataError if test fails
		finally:
			self.conn.rollback()
			table.drop()

	def testPgSphereRaises(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="public"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		self.conn.execute("create table valtest (x integer)")
		table = api.TableForDef(rd.tables[0], connection=self.conn)
		try:
			self.assertRaisesWithMsg(api.DataError,
				"Table public.valtest: type mismatch in column x (Incompatible"
				" type in DB: Expected spoint, found int4)",
				table.ensureOnDiskMatches,
				())
		finally:
			self.conn.rollback()
			table.drop()
	

from gavo.user import validation

class TableValTest(testhelpers.VerboseTest):
	resources = [("ignored", tresc.csTestTable)]

	class defaultArgs:
		compareDB = False
	
	def _getRdWithTable(self, columns):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="__test"><table id="totest" onDisk="True">
			<meta>
				creationDate: 2012-12-12T12:12:12
				subject: testing
				description: something to ignore
			</meta>
			%s</table></resource>"""%columns)
		rd.sourceId = "testing/q"
		return rd
	
	def _getValFuncOutput(self, func, rd):
		return testhelpers.captureOutput(func, (rd, self.defaultArgs))[1]

	def _getMessagesForColumns(self, columns):
		rd = self._getRdWithTable(columns)
		return self._getValFuncOutput(validation.validateTables, rd)

	def testReservedName(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='distance'/>"),
			"[WARNING] testing/q: Column totest.distance: Name is not a regular ADQL identifier.\n")

	def testDelimitedId(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='quoted/distance'/>"),
			"")

	def testBadUCD(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='foo' ucd='em.IR'/>"),
			"[WARNING] testing/q: Column __test.totest.foo: UCD em.IR not accepted by astropy (Secondary word 'em.IR' is not valid as a primary word).\n")

	def testTAPSchemaValidation(self):
		td = testhelpers.getTestRD().getById("csdata")
		prev = td.columns[0].ucd
		try:
			td.columns[0].ucd = "blabla"
			self.assertEqual(
				list(validation.validateTable(td, True)), [
					('warning', 'Column test.csdata.alpha: UCD blabla not'
						" accepted by astropy (Unknown word 'blabla')."),
					('error',
						"Table test.csdata: Column alpha: tap_schema ucd!='blabla'")])
		finally:
			td.columns[0].ucd = prev


from gavo.web import examplesrender

class ExamplesTest(testhelpers.VerboseTest):
	def testWorkingExamples(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"](
				"Here is a :genparam:`par(example, no?)` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue(b'<span property="generic-parameter" typeof="keyval"'
			in res)
		self.assertTrue(b'<span property="key" class="genparam-key">par</span>'
			in res)
		self.assertTrue(b'<span property="value" class="genparam-value">'
			b'example, no?</span>' in res)
	
	def testFailingExample(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"]("Here is a :genparam:`parfoo` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue(b"parfoo does not" in res)
		self.assertTrue(b'<span class="problematic"' in res)


def _getUpgraders():
	class To10Upgrader(upgrade.Upgrader):
		version = 9
		s_010_foo = "CREATE TABLE foo"
		u_010_fill_foo = "INSERT INTO foo"

	class To11Upgrader(upgrade.Upgrader):
		version = 10

		s_010_bar = "CREATE TABLE bar"

		u_010_fill_bar = "INSERT INTO bar"

		u_005_register_bar = "INSERT INTO metastore"
	
	class To20Upgrader(upgrade.Upgrader):
		version = 19

		s_005_addheck = "ALTER TABLE bar ADD COLUMN quux"
		s_010_addheck = "ALTER TABLE bar ADD COLUMN honk"
	
	return list(locals().values())


class UpgraderTest(testhelpers.VerboseTest):
	def testSchemachangeSort(self):
		statements = list(upgrade.iterStatements(9, 11, _getUpgraders()))
		self.assertEqual(statements[0], "CREATE TABLE foo")
		self.assertTrue(statements[2] is upgrade._COMMIT)
		self.assertEqual(statements[3], 'INSERT INTO foo')
		updateSchemaversion = statements[4]
		self.assertTrue("To10Upgrader" in repr(updateSchemaversion.__self__))
		self.assertEqual(len(statements), 10)

	def testHoleCovered(self):
		statements = list(upgrade.iterStatements(10, 20, _getUpgraders()))
		self.assertEqual(statements[2], "ALTER TABLE bar ADD COLUMN honk")
		self.assertTrue(statements[3] is upgrade._COMMIT)
		self.assertTrue("To20Upgrader" in repr(statements[-2].__self__))


class TestScaffoldTest(testhelpers.VerboseTest):
	def testDataServer(self):
		with testhelpers.DataServer("success!\n") as url:
			res = utils.urlopenRemote(url).read()
		self.assertEqual(res, b"success!\n")

		# try another to make sure the first has been killed; if
		# it hasn't then this second server can't be bound to the port.
		with testhelpers.DataServer(b"second success!\n") as url:
			res = utils.urlopenRemote(url).read()
		self.assertEqual(res, b"second success!\n")


from gavo import rsc
from gavo.formats import fitstable
from gavo.user import mkrd

class FITSColGenTest(testhelpers.VerboseTest,
		metaclass=testhelpers.SamplesBasedAutoTest):
	def testBasic(self):
		table = api.TableForDef(api.resolveCrossId("data/test#abcd"),
			rows=[{"a": "foobar", "b": 23, "c": 42, "d": 0,
				"e": datetime.datetime(2011, 12, 12)}])
		hdus = fitstable.makeFITSTable(rsc.wrapTable(table))
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsFITS(hdus[1])),
			'<column name="a" type="text"\n'
			'  \n'
			'  description="Some weirdness"\n'
			'  verbLevel="1"/>\n'
			'<column name="b" type="integer"\n'
			'  \n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="c" type="integer"\n'
			'  \n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="d" type="integer"\n'
			'  unit="km" \n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="e" type="text"\n'
			'  \n'
			'  utype="junk.junk.junk"\n'
			'  description=""\n'
			'  verbLevel="1"/>')

	def testBadCode(self):
		self.assertRaisesWithMsg(
			base.ReportableError,
			"FITS type code 'JUNK' of flaubert not handled by gavo mkrd;"
			" add handling if you can.",
			mkrd.getTypeForFTFormat,
			("JUNK", "flaubert"))

	def _runTest(self, example):
		fitsCode, dbType = example
		self.assertEqual(
			mkrd.getTypeForFTFormat(fitsCode, "testing"),
			dbType)

	samples = [
		("A", "char"),
		("1A", "char"),
		("10A", "text"),
		("L", "boolean"),
		("15L", "boolean[]"),
#5
		("1B", "bytea"),
		("10I", "smallint[]"),
		("J", "integer"),
		("40K", "bigint[]"),
		("2E", "real[]"),
#10
		("2D", "double precision[]"),
		("PD()", "double precision[]"),
		("1PE()", "real[]"),
		("1PI(120)", "smallint[]"),
		("PA()", "text"),
	]


class VOTableColGenTest(testhelpers.VerboseTest):
	def testBasic(self):
		from gavo.formats import votable
		table = api.TableForDef(api.resolveCrossId("data/test#abcd"))
		inF = io.BytesIO(votable.getAsVOTable(table))
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsVOTable(inF, 1)),
			'<column name="a" type="text"\n'
  		'  ucd=""\n'
  		'  description="Some weirdness"\n'
  		'  verbLevel="1"/>\n'
			'<column name="b" type="integer"\n'
  		'  ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="c" type="integer"\n'
  		'  ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="d" type="integer"\n'
  		'  unit="km" ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="e" type="timestamp"\n'
  		'  ucd=""\n'
  		'  utype="junk.junk.junk"\n'
  		'  description=""\n'
  		'  verbLevel="1"/>')


_BBB_EXAMPLE = """
Some random junk

Abstract:
  We provide a catalogue of 541 stars, brown dwarfs, and exoplanets in 339
  outreach potential.

File Summary:
--------------------------------------------------------------------------------
 FileName    Lrecl   Records       Explanations
--------------------------------------------------------------------------------
ReadMe              80        .    this file
The10pcSample.dat   930       561  the 10 pc catalogue


--------------------------------------------------------------------------------
Byte-by-byte Description of file: The10pcSample.dat
--------------------------------------------------------------------------------
 Bytes    Format Units  Label            Explanations
--------------------------------------------------------------------------------
  1-  4   I4     ---    NB_OBJ           Running number for object
 11- 36   A26    ---    SYSTEM_NAME      Name of the system
 74- 87   F14.9  ---    RA               Right ascension in ICRS
129-159   A31    ---    PARALLAX_BIBCODE Reference for the parallax
178-193   F7.2   mas/yr PMRA_ERROR       Proper motion uncertainty in right
                                       ascension
348-349   I2     ---    G_CODE           =2 if G from Gaia DR2 ; =3 if G from
                                       Gaia EDR3 ; =10 if G derived from
                                       spectral type ; =20 for spectral type >T6
                                       (arbitrarily set to absolute G mag = 25)
583-930   I8     ---    COMMENT          Additional comments on exoplanets,
                                       multiplicity, etc

--------------------------------------------------------------------------------
"""

class VizColGenTest(testhelpers.VerboseTest):
	def testBasic(self):
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsViz(io.StringIO(_BBB_EXAMPLE))),
			'<column name="NB_OBJ" type="integer"\n'
			'  ucd=""\n'
			'  description="Running number for object"\n'
			'  verbLevel="1"/>\n'
			'<column name="SYSTEM_NAME" type="text"\n'
			'  ucd=""\n'
			'  description="Name of the system"\n'
			'  verbLevel="1"/>\n'
			'<column name="RA" type="double precision"\n'
			'  ucd=""\n'
			'  description="Right ascension in ICRS"\n'
			'  verbLevel="1"/>\n'
			'<column name="PARALLAX_BIBCODE" type="text"\n'
			'  ucd=""\n'
			'  description="Reference for the parallax"\n'
			'  verbLevel="1"/>\n'
			'<column name="PMRA_ERROR" type="real"\n'
			'  unit="mas/yr" ucd=""\n'
			'  description="Proper motion uncertainty in right ascension"\n'
			'  verbLevel="1"/>\n'
			'<column name="G_CODE" type="smallint"\n'
			'  ucd=""\n'
			'  description="=2 if G from Gaia DR2 ; =3 if G from Gaia EDR3 ; =10 if G derived from spectral type ; =20 for spectral type >T6 (arbitrarily set to absolute G mag = 25)"\n'
			'  verbLevel="1"/>\n'
			'<column name="COMMENT" type="bigint"\n'
			'  ucd=""\n'
			'  description="Additional comments on exoplanets, multiplicity, etc"\n'
			'  verbLevel="1"/>')


from gavo.base import typesystems

# I think we ought to have a test module for the whole type conversion
# thing -- I wonder why I didn't make one back when I wrote that stuff
class ScalarifyTest(testhelpers.VerboseTest,
		metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		arrtype, stype = sample
		self.assertEqual(typesystems.scalarify(arrtype), stype)
	
	samples = [
		("char", "char"),
		("spoly", "spoly"),
		("int[]", "int"),
		("int[12][]", "int"),]
	
	def testWithConversion(self):
		self.assertEqual(typesystems.sqltypeToPython(
			typesystems.scalarify("bigint[22][]"))("432"), 432)


from gavo.imp import astropyucd

class UCDTest(testhelpers.VerboseTest):
	def testBasic(self):
		self.assertEqual(
			astropyucd.parse_ucd("pos.eq.dec;meta.main"),
				[('ivoa', 'pos.eq.dec'), ('ivoa', 'meta.main')])
	
	def testColor(self):
		self.assertEqual(
			astropyucd.parse_ucd("phot.color;em.opt.B;em.opt.V"),
				[('ivoa', 'phot.color'), ('ivoa', 'em.opt.B'), ('ivoa', 'em.opt.V')])

	def testNonPrimary(self):
		self.assertEqual(
			astropyucd.check_ucd("obs;meta.curation",
				check_controlled_vocabulary=True),
			False)


from gavo.user.admin import hipsgen, hipsfill


class HipsGenTest(testhelpers.VerboseTest):
	def testHipsgen(self):
		with testhelpers.testFile("hg.rd", """<resource schema="test">
				<meta>
				creator: Painter, J.
				creationDate: 1981-12-24
				description: Pictures at an Exhibition
				subject: imaging
				title: ELP
				</meta>
				<table id="pics">
					<publish/>
				</table>
				<data id="imp">
					<sources pattern="data1/*.fits">
						<pattern>data2/fits/*.fits</pattern>
					</sources>
					<fitsProdGrammar/>
					<make><table/></make>
				</data></resource>""",
				inDir=base.getConfig("inputsDir")):
			_, generatedOutput, err = testhelpers.captureOutput(
				hipsgen,
				(None, utils.NS(dataId="hg#imp", minOrder=3)))
			self.assertEqual(generatedOutput,
				"in=data1\n"
				"in=data2/fits\n"
				"minOrder=3\n"
				"creator: Painter, J.\n"
				"id=ivo://x-testing/hg/pics\n"
				"status=public clonable\n"
				"title=ELP\n"
				"out=hips\n")

	def testHipsfill(self):
		with tempfile.TemporaryDirectory() as d:
			with testhelpers.testFile("properties",
					"creator_did          = ivo://org.gavo.dc/P/fornax/q/c\n"
					"#hips_creator        = HiPS creator (institute or person)\n"
					"bib_reference        = Gallilei, G., in prep.\n"
					"#obs_regime          = Waveband keyword (Radio\n"
					"#em_max              = Stop in spectral coordinates in meters\n",
					inDir=d) as tf:
				hipsfill(None,
					utils.NS(
						svcId="data/cores#convcat",
						hipsDir=pathlib.Path(d)))
				with open(tf) as f:
					filledOut = f.read()

		self.assertEqual(filledOut,
"""creator_did          = ivo://org.gavo.dc/P/fornax/q/c
hips_creator         = Gandalf, W.
bib_reference        = Gallilei, G., in prep.
#obs_regime          = Waveband keyword (Radio
em_max               = 6.621486190496428e-06
""")


class BiblinksImportTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testImport(self):
		rd = base.caches.getRD("data/testdata")
		api.makeData(rd.getById("import-biblinks"), connection=self.conn)

		with base.getTableConn() as conn:
			created = list(conn.queryToDicts("select * from dc.biblinks"))
		
		self.assertEqual(len(created), 4)
		self.assertEqual(set(r["bib_ref"] for r in created
			if r["dataset_ref"]=='http://reg.g-vo.org/LP/x-testing/data/testdata/haslinks'),
			{'2015A&C....10...88D', '2005ASPC..347...29T'})
		self.assertEqual(set(r["relationship"] for r in created),
			{"Cites", "IsSupplementedBy"})
		self.assertEqual(set(r["bib_format"] for r in created),
			{None, "doi"})
		self.assertEqual(set(r["dataset_ref"] for r in created
			if r["bib_ref"]=='10.1051/0004-6361:20010923'),
			{'http://localhost:8080/getproduct/prod1.bin',
				'http://localhost:8080/getproduct/prod0.bin'})

	def testReimport(self):
		# this tests whether importing removes the previously obtained
		# biblinks.
		return self.testImport()


from gavo.rscdef import regtest

_EMPTY_VOTABLE = b"""
<VOTABLE version="1.4" xmlns="http://www.ivoa.net/xml/VOTable/v1.3">
	<RESOURCE type="results">
		<TABLE name="result">
			<FIELD ID="hipno"  arraysize="*" datatype="char" name="hipno" ucd="ID_MAIN"/>
			<DATA><BINARY><STREAM encoding="base64"/></BINARY>
			</DATA></TABLE></RESOURCE></VOTABLE>"""

_ERROR_VOTABLE = b"""
<VOTABLE version="1.4" xmlns="http://www.ivoa.net/xml/VOTable/v1.3">
	<RESOURCE type="results">
		<INFO ID="Error" name="Error" value="Field SR: 'fromp' is not a valid literal for SR"/></RESOURCE></VOTABLE>"""


class RegTestTest(testhelpers.VerboseTest):
	def testGetRowEmptyResponse(self):
		t = api.makeStruct(regtest.RegTest, title="Testtest")
		t.data = _EMPTY_VOTABLE
		self.assertEqual(t.getVOTableRows(), [])

	def testGetRowErrorResponse(self):
		t = api.makeStruct(regtest.RegTest, title="Testtest")
		t.data = _ERROR_VOTABLE
		self.assertRaisesWithMsg(
			AssertionError,
			"Response did not contain a TABLE -- is it an error message?",
			t.getVOTableRows,
			())

	def testGetFirstErrorResponse(self):
		t = api.makeStruct(regtest.RegTest, title="Testtest")
		t.data = _ERROR_VOTABLE
		self.assertRaisesWithMsg(
			AssertionError,
			"Response did not contain a TABLE -- is it an error message?",
			t.getFirstVOTableRow,
			())


if __name__=="__main__":
	testhelpers.main(KVLMakeTest)
