"""
Tests for ADQL user defined functions and Region expressions.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from urllib import parse
import unittest

from gavo.helpers import testhelpers

from gavo import adql
from gavo import base
from gavo import rscdesc  #noflake: for registration
from gavo import utils
from gavo.protocols import adqlglue
from gavo.protocols import simbadinterface #noflake: for ufunc registration
from gavo.adql import nodes
from gavo.adql import ufunctions

import adqltest
import tresc


# we need to open a database connection here to have the DB initialisation
# update the ufuncs list
_conn = base.getDBConnection("feed")
_conn.close()
del _conn


class BasicTest(unittest.TestCase):
	def testRaising(self):
		self.assertRaises(adql.UfuncError, adql.parseToTree,
			"SELECT x FROM y WHERE gavo_foo(8)=7")

	def testFlattening(self):
		self.assertEqual(
			adql.parseToTree("SELECT x FROM y WHERE 1=gavo_match('x.*', frob)"
				).flatten(),
			"SELECT x FROM y WHERE 1 = (CASE WHEN frob ~ 'x.*' THEN 1 ELSE 0 END)")


class _UfuncDefinition(testhelpers.TestResource):
	def make(self, nodeps):
		@adql.userFunction("gavo_testingXXX",
			"(x INTEGER) -> INTEGER",
			"""
			This function returns its argument decreased by one.
			
			This is the end.
			""",
			polymorphism=[("(x REAL, y INTEGER) -> REAL",
				"This is an alternative function that does about the same thing")])
		def _f1(args):
			if len(args)!=1:
				raise adql.UfuncError("gavo_testingXXX takes only a single argument")
			return "(%s+1)"%nodes.flatten(args[0])
		
		@adql.userFunction("gavo_testingYYY",
			"(x DOUBLE PRECISION) -> DOUBLE PRECISION",
			"This function will not work (since it's not defined in the DB)")
		def _f2(args):
			return None
	
	def clean(self, ignored):
		del ufunctions.UFUNC_REGISTRY["GAVO_TESTINGXXX"]
		del ufunctions.UFUNC_REGISTRY["GAVO_TESTINGYYY"]


_ufuncDefinition = _UfuncDefinition()

class UfuncDefTest(testhelpers.VerboseTest):
	resources = [("ufunc_defined", _ufuncDefinition),
		("adqlTestTable", adqltest.adqlTestTable),
		("querier", adqltest.adqlQuerier)]

	def testUfuncMeta(self):
		f = ufunctions.UFUNC_REGISTRY["GAVO_TESTINGXXX"]
		self.assertEqual(f.adqlUDF_name, "gavo_testingXXX")
		self.assertEqual(f.adqlUDF_signature,
			"gavo_testingXXX(x INTEGER) -> INTEGER")
		self.assertEqual(f.adqlUDF_doc, "This function returns its argument"
			" decreased by one.\n\nThis is the end.")
	
	def testFlattening(self):
		self.assertEqual(
			adql.parseToTree("SELECT GAVO_TESTINGXXX(frob) FROM x"
				).flatten(),
			"SELECT (frob+1) FROM x")
	
	def testFlatteningTransparent(self):
		self.assertEqual(
			adql.parseToTree("SELECT GAVO_TESTINGYYY(CIRCLE('', a, b, c), u) FROM x"
				).flatten(),
			'SELECT GAVO_TESTINGYYY(CIRCLE(,a,b,c), u) FROM x')

	def testQueryInSelectList(self):
		self.assertEqual(list(self.querier.queryADQL(
			"SELECT GAVO_TESTINGXXX(rV) FROM test.adql where mag<0").rows[0].values()),
			[29.])

	def testQueryInWhereClause(self):
		self.assertEqual(list(self.querier.queryADQL(
			"SELECT rV FROM test.adql where GAVO_TESTINGXXX(rV)>0").rows[0].values()),
			[0.])

	def testPolymorphism(self):
		self.assertEqual(
			ufunctions.UFUNC_REGISTRY["GAVO_TESTINGXXX"
				].adqlUDF_additionalSignatures[0]["signature"],
				"gavo_testingXXX(x REAL, y INTEGER) -> REAL")
		self.assertEqual(
			ufunctions.UFUNC_REGISTRY["GAVO_TESTINGYYY"
				].adqlUDF_additionalSignatures, [])


class _UfuncTestbed(tresc.RDDataResource):
	"""A table that contains test material for ufuncs.

	If you write queries against this, make sure you survive schema
	*and data* extensions.  They are sure to come as new UDFs need exercise.
	Use the kind column as necessary.
	"""
	rdName = "data/ufuncex.rd"
	dataId = "import"

_ufuncTestbed = _UfuncTestbed()


class BuiltinUfuncTest(testhelpers.VerboseTest):
	resources = [
		("ssaTestTable", tresc.ssaTestTable),
		("ufuncTestTable", _ufuncTestbed),
		("querier", adqltest.adqlQuerier)]

	def testHaswordQuery(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ssa_targname from test.hcdtest where"
			" 1=ivo_hasword(ssa_targname, 'rat hole')").rows,
			[{'ssa_targname': 'rat hole in the yard'}])

	def testHaswordQueryInsensitive(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ssa_targname from test.hcdtest where"
			" 1=ivo_hasword(ssa_targname, 'Booger')").rows,
			[{'ssa_targname': 'booger star'}])

	def testHaswordQueryBorders(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ssa_targname from test.hcdtest where"
			" 1=ivo_hasword(ssa_targname, 'ooger')").rows,
			[])
	
	def testHashlistSimple(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ivo_hashlist_has('bork#nork#gaob norm', 'nork') as h"
				" FROM test.hcdtest").rows,
			[{'h': 1}])

	def testHashlistBorders(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ivo_hashlist_has('bork#nork#gaob norm', 'ork') as h"
				" FROM test.hcdtest").rows,
			[{'h': 0}])

	def testHashlistNocase(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ivo_hashlist_has('bork#nork#gaob norm', 'nOrk') as h"
				" FROM test.hcdtest").rows,
			[{'h': 1}])

	def testHashlistNocaseList(self):
		self.assertEqual(self.querier.queryADQL(
			"select distinct ivo_hashlist_has('bork#nOrk#gaob norm', 'nork') as h"
				" FROM test.hcdtest").rows,
			[{'h': 1}])
	

	def testNocasematch(self):
		self.assertEqual(len(self.querier.queryADQL(
			"select ssa_targname FROM test.hcdtest"
				" WHERE 1=ivo_nocasematch(ssa_targname, 'BOOGER%')").rows),
			2)
	
	def testToMJD(self):
		self.assertEqual(self.querier.queryADQL(
			"select gavo_to_mjd(dt) as mjd from test.ufuncex where testgroup='jd'"
			).rows, [{'mjd': 45917.5}])

	def testToJD(self):
		self.assertEqual(self.querier.queryADQL(
			"select ivo_to_jd(dt) as jd from test.ufuncex where testgroup='jd'"
			).rows, [{'jd': 2445918.0}])


class RegionTest(unittest.TestCase):
	"""tests for sane parsing of our default region expressions.
	"""
	resources = [("fs", tresc.fakedSimbad)]

	def testRaising(self):
		"""tests for plausible exceptions.
		"""
		self.assertRaises(adql.RegionError, adql.parseToTree,
			"SELECT x FROM y WHERE 1=CONTAINS(REGION('78y'), REGION('zzy9'))")
		self.assertRaises(adql.RegionError, adql.parseToTree,
			"SELECT x FROM y WHERE 1=CONTAINS(REGION(dbColumn || otherColumn),"
			" CIRCLE('ICRS', 10, 10 ,2))")


def getMorphed(query):
	return nodes.flatten(adql.morphPG(adql.parseToTree(query))[1])


class RRFunctionsTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [("ufuncTestTable", _ufuncTestbed)]
	def _runTest(self, sample):
		query, expected = sample
		self.assertEqualIgnoringAliases(getMorphed(query), expected)
	
	samples = [
		("select testgroup from test.ufuncex where 1=ivo_hasword(testgroup, 'abc')",
			"SELECT testgroup FROM test.ufuncex WHERE ("
			"to_tsvector('english', testgroup)"
				" @@ plainto_tsquery('english', 'abc'))"),
 		("select ivo_hasword(testgroup, 'abc') from test.ufuncex",
 			"SELECT IVO_HASWORD(testgroup, 'abc') ASWHATEVER FROM test.ufuncex"),
		("select testgroup from test.ufuncex where"
			" 1=ivo_hashlist_has('a#b#c', testgroup)",
			"SELECT testgroup FROM test.ufuncex WHERE lower(testgroup) ="
			" ANY(string_to_array(lower('a#b#c'), '#'))"),
		("select testgroup from test.ufuncex where"
			" 1=ivo_nocasematch(testgroup, 'honk%')",
			"SELECT testgroup FROM test.ufuncex WHERE (LOWER(testgroup) like 'honk%')"),
	]

	
class IntervalsMorphTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [("ufuncTestTable", _ufuncTestbed)]
	def _runTest(self, sample):
		query, expected = sample
		self.assertEqualIgnoringAliases(getMorphed(query), expected)
	
	samples = [
		("select testgroup from test.ufuncex where 1=ivo_interval_has(a, iv)",
			"SELECT testgroup FROM test.ufuncex WHERE ((a) <@ (iv))"),
		("select testgroup from test.ufuncex where 0=ivo_interval_has(a, iv)",
			"SELECT testgroup FROM test.ufuncex WHERE NOT ( ((a) <@ (iv)) )"),
	]


class AggFunctionsTest(testhelpers.VerboseTest):
	resources = [
		("ssaTestTable", tresc.ssaTestTable),
		("querier", adqltest.adqlQuerier)]

	def testStringAggMorph(self):
		self.assertEqualIgnoringAliases(
			getMorphed("select ivoid, ivo_string_agg(res_subject, ',')"
				" from rr.res_subject group by ivoid"),
			"SELECT ivoid, string_agg(res_subject, ',') ASWHATEVER"
			" FROM rr.res_subject GROUP BY ivoid")

	def testStringAddExec(self):
		res = self.querier.queryADQL(
			"SELECT ivo_string_agg(accref || mime, '#') as goo, ssa_pubdid"
				" from test.hcdtest group by ssa_pubdid order by ssa_pubdid").rows
		self.assertEqual(res[0]["ssa_pubDID"], 'ivo://test.inv/test1')
		self.assertEqual(set(res[0]["goo"].split("#")),
			set('data/spec1.ssatestimage/jpeg#data/spec1.ssatest.'
				'votapplication/x-votable+xml'.split("#")))


@unittest.skipUnless(testhelpers.hasUDF("IVO_HEALPIX_INDEX"), "pgsphere too old")
class HealpixFunctionsTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [("ufuncTestTable", _ufuncTestbed)]
	def _runTest(self, sample):
		query, expected = sample
		self.assertEqualIgnoringAliases(getMorphed(query), expected)
	
	samples = [
		("select ivo_healpix_index(10, point('', ra, dec)) from test.ufuncex",
			"SELECT healpix_nest(10, spoint(RADIANS(ra), RADIANS(dec))) ASWHATEVER FROM test.ufuncex"),
		("select ivo_healpix_index(10, p) from test.ufuncex",
			"SELECT healpix_nest(10, p) ASWHATEVER FROM test.ufuncex"),
		("select ivo_healpix_center(10, 40002) from test.ufuncex",
			"SELECT center_of_healpix_nest(10, 40002) ASWHATEVER FROM test.ufuncex"),
	]


@unittest.skipUnless(testhelpers.hasUDF("IVO_HEALPIX_INDEX"), "pgsphere too old")
class HealpixExecTest(testhelpers.VerboseTest):
	resources = [
		("querier", adqltest.adqlQuerier),
		("test_ufunc", _ufuncTestbed)]

	def testIndexAndPoint(self):
		res = self.querier.queryADQL(
			"SELECT ivo_healpix_index(10, p) as hpi"
			" from test.ufuncex WHERE dt<'1985-01-01'")
		self.assertEqual(res.rows[0]["hpi"], 2937727)
	
	def testCenterFinding(self):
		res = self.querier.queryADQL(
			"SELECT ivo_healpix_center(10, ivo_healpix_index(10, ra, dec)) as pt"
			" from test.ufuncex WHERE dt<'1985-01-01'")
		self.assertAlmostEqual(res.rows[0]["pt"].y, -12.25362992*utils.DEG)
		self.assertAlmostEqual(res.rows[0]["pt"].x, 23.51074219*utils.DEG)
		self.assertEqual(res.tableDef.columns[0].type, "spoint")

	def testHealpixBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"ivo_healpix_center only takes (index, order) arguments",
			getMorphed,
			("select ivo_healpix_center() from test.ufuncex",))


class SimbadpointTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("fs", tresc.fakedSimbad)]

	def testBadArgcount(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"gavo_simbadpoint takes exactly one string literal as argument",
			getMorphed,
			("SELECT * from test.ufuncex WHERE 1=CONTAINS("
				"gavo_simbadpoint('aldebaran', 23), CIRCLE('ICRS', ra, dec, 1))",))

	def testBadArgtype(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"gavo_simbadpoint takes exactly one string literal as argument",
			getMorphed,
			("SELECT * from test.ufuncex WHERE 1=CONTAINS("
				"gavo_simbadpoint(testgroup), CIRCLE('ICRS', ra, dec, 1))",))

	def testBadName(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"No simbad position for 'Henker'",
			getMorphed,
			("SELECT * from test.ufuncex WHERE 1=CONTAINS("
				"gavo_simbadpoint('Henker'), CIRCLE('ICRS', ra, dec, 1))",))

	def testResolution(self):
		self.assertEqualIgnoringAliases(
			getMorphed("SELECT * from test.ufuncex WHERE 1=CONTAINS("
				"ivo_simbadpoint('M1'), CIRCLE('ICRS', ra, dec, 0.333))"),
				"SELECT * FROM test.ufuncex WHERE ((spoint(RADIANS(83.65625),"
				" RADIANS(22.0145))) <@ (scircle(spoint(RADIANS(ra), RADIANS(dec)),"
				" RADIANS(0.333))))")

	def testDistance(self):
		self.assertEqual(getMorphed("SELECT * from test.ufuncex WHERE 2>"
				"distance(point(ra, dec), gavo_simbadpoint('M1'))"),
			"SELECT * FROM test.ufuncex WHERE  (spoint(RADIANS(ra), RADIANS(dec))) <@ scircle(spoint(RADIANS(83.65625), RADIANS(22.0145)), RADIANS(2))")


@unittest.skipUnless(testhelpers.hasUDF("IVO_EPOCH_PROP"), "pgsphere too old")
class LegacyMotionTest(testhelpers.VerboseTest):
# Let's eventually remove gavo_apply_pm once ivo_epoch_prop is
# safely installed.
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def testQueryDecTrans(self):
		res = self.querier.queryADQL(
			"SELECT ivo_apply_pm(ra, dec, 0., -0.1, 50)"
			" as newpos from test.ufuncex where testgroup='jd'")
		self.assertEqual(res.rows[0]["newpos"].asSTCS('ICRS'),
			"Position ICRS 23.5 -17.2373652888")

	def testQueryGavoPrefix(self):
		res = self.querier.queryADQL(
			"SELECT gavo_apply_pm(ra, dec, 0., -0.1, 50)"
			" as newpos from test.ufuncex where testgroup='jd'")
		self.assertEqual(res.rows[0]["newpos"].asSTCS('ICRS'),
			"Position ICRS 23.5 -17.2373652888")

	def testQueryRATrans(self):
		# Starting pos are (10, 20) and (50, 80)
		res = self.querier.queryADQL(
			"SELECT ivo_apply_pm(ra, dec, 0.1, 0, 50) as newpos"
			" from test.ufuncex where testgroup='intervals'"
			" ORDER BY dec")
		transPos = [r["newpos"].asDALI() for r in res]
		self.assertAlmostEqual(transPos[0][0], 15.30567124204)
		self.assertAlmostEqual(transPos[0][1], 19.92106445273)
		self.assertTrue(transPos[0][0]-10<transPos[1][0]-50)

	def testQueryConstraint(self):
		res = self.querier.queryADQL(
			"SELECT testgroup from test.ufuncex"
			" where 1=Contains("
			"ivo_apply_pm(ra, dec, 0.01, -0.01, 23),"
			"CIRCLE('ICRS', 23.735, -12.48, 0.01))")
		self.assertEqual(testhelpers.pickSingle(res.rows)["testgroup"], "jd")

	def testTransformBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"gavo_apply_pm requires exactly ra, dec, pmra, pmdec, epdist.",
			getMorphed,
			("select ivo_apply_pm(1, 2) from test.ufuncex",))


@unittest.skipUnless(testhelpers.hasUDF("IVO_EPOCH_PROP"), "pgsphere too old")
class EpochPropTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def testPropDecTrans(self):
		res = self.querier.queryADQL(
			"SELECT ivo_epoch_prop(ra, dec, NULL, 0., 400, NULL, 2000, 2022)"
			" as newpv from test.ufuncex where testgroup='jd'")
		
		colDef = res.tableDef.getColumnByName("newpv")
		self.assertEqual(
			colDef.type,
			"double precision[6]")
		self.assertEqual(
			colDef.ucd,
			'')  # TODO: We really want a UCD for this kind of monster
		
		val = res.rows[0]["newpv"]
		self.assertAlmostEqual(val[0], 23.5)
		self.assertAlmostEqual(val[1], -12.247555555555)
		self.assertEqual(val[2], None)
		self.assertAlmostEqual(val[3], 0)
		self.assertAlmostEqual(val[4], 399.99999927192)
		self.assertEqual(val[5], None)

	def testProp6Par(self):
		# Starting pos are (10, 20) and (50, 80)
		res = self.querier.queryADQL(
			"SELECT ivo_epoch_prop(ra, dec, 60, 120, -40, -140, 2022, 1980) as newpos"
			" from test.ufuncex where testgroup='intervals'"
			" ORDER BY dec")

		pv1, pv2 = res.rows[0]["newpos"], res.rows[1]["newpos"]
		self.assertAlmostEqual(pv1[0], 9.998510684068139)
		self.assertAlmostEqual(pv2[1], 80.0004664014105)
		self.assertAlmostEqual(pv1[2], 59.9783590172609)
		self.assertAlmostEqual(pv1[3], 119.91380703443762)
		self.assertAlmostEqual(pv2[3], 119.91898781569608)
		self.assertAlmostEqual(pv2[4], -39.95453831783573)
		self.assertAlmostEqual(pv1[5], -140.00025726418698)

	def testPropBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"ivo_epoch_prop gets ra, dec [deg], parallax [mas], pmra, pmdec"
			" [mas/yr], radial_velocity [km/s], ref_epoch, out_epoch [yr] arguments",
			getMorphed,
			("select ivo_epoch_prop(1, 2) from test.ufuncex",))

	def testPropPos(self):
		# Starting pos are (10, 20) and (50, 80)
		res = self.querier.queryADQL(
			"SELECT ivo_epoch_prop_pos(ra, dec, 60, 120, -40, -140, 2022, 1980)"
			" as newpos"
			" from test.ufuncex where testgroup='intervals'"
			" ORDER BY dec")

		colDef = res.tableDef.getColumnByName("newpos")
		self.assertEqual(colDef.type, "spoint")
		self.assertEqual(colDef.ucd, "pos")

		pv1, pv2 = res.rows[0]["newpos"], res.rows[1]["newpos"]
		self.assertAlmostEqual(pv1.asDALI()[0], 9.998510684068163)
		self.assertAlmostEqual(pv2.asDALI()[1], 80.00046640141034)

	def testPropPos6Arg(self):
		# Starting pos are (10, 20) and (50, 80)
		res = self.querier.queryADQL(
			"SELECT ivo_epoch_prop_pos(ra, dec, 120, -40, 2000+22, 990*2)"
			" as newpos"
			" from test.ufuncex where testgroup='intervals'"
			" ORDER BY dec")

		pv1, pv2 = res.rows[0]["newpos"], res.rows[1]["newpos"]
		# this is different from above because par/rv are assumed 0 here.
		self.assertAlmostEqual(pv1.asDALI()[0], 9.99851014670222)
		self.assertAlmostEqual(pv2.asDALI()[1], 80.0004665696594)

	def testPropPosWhere(self):
		res = self.querier.queryADQL(
			"SELECT ra, dec from test.ufuncex where testgroup='intervals'"
			" AND distance(ivo_epoch_prop_pos(12, 0, NULL, -120000, 0,"
			" NULL, 2000, 2060), point(ra, dec-20))<0.001")
		self.assertEqual(testhelpers.pickSingle(res.rows)["ra"], 10.)

	def testPropPosMsg(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field query: ivo_epoch_prop_pos gets ra, dec [deg], parallax [mas], pmra, pmdec [mas/yr], radial_velocity [km/s], ref_epoch, out_epoch [yr] arguments, where you may leave out parallax and radial_velocity.",
			self.querier.queryADQL,
			("SELECT ivo_epoch_prop_pos(ra, dec, 120, -40)"
				" as newpos"
				" from test.ufuncex where testgroup='intervals'"
				" ORDER BY dec",))

	def testPropNoPM(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1"
			" ivo_epoch_prop(10, 12, NULL, NULL, NULL, NULL, 2000, 2020) as prop"
			" from tap_schema.tables")
		self.assertEqual(res.rows[0]["prop"],
			[10.0, 12.000000000000002, None, None, None, None])

	def testPropNoPMDec(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1"
			" ivo_epoch_prop(10, 12, 3, NULL, NULL, NULL, 2000, 2020) as prop"
			" from tap_schema.tables")
		self.assertEqual(res.rows[0]["prop"],
			[10.0, 12.000000000000002, 3, None, None, None])

	def testPropNoPMRA(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1"
			" ivo_epoch_prop(10, 12, NULL, 3, NULL, NULL, 2000, 2020) as prop"
			" from tap_schema.tables")
		self.assertAlmostEqualVector(res.rows[0]["prop"],
			[10.0, 12., None, None, None, None])

	def testPropNoRV(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1"
			" ivo_epoch_prop(10, 12, 4, 2, 5, NULL, 2000, 2020) as prop"
			" from tap_schema.tables")
		self.assertAlmostEqualVector(res.rows[0]["prop"],
			[10.000011359341114, 12.000027777777, 3.999999999999,
				2.000000206100, 4.99999991755, None])

	def testPropNoParallax(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1"
			" ivo_epoch_prop(10, 12, NULL, 2, 5, 4000, 2000, 2020) as prop"
			" from tap_schema.tables")
		self.assertAlmostEqualVector(res.rows[0]["prop"],
			[10.000011359341022, 12.000027777777323, None,
			2.0000001733735733, 4.999999835741298, None])

class IntervalQueryTest(testhelpers.VerboseTest,
		metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def _runTest(self, sample):
		constraint, expected = sample
		res = self.querier.queryADQL(
			"select count(*) as ct from test.ufuncex"
			" where testgroup='intervals'"
			" and "+constraint)
		self.assertEqual(res.rows[0]["ct"], expected)
	
	samples = [
		("1=ivo_interval_overlaps(ra, dec, 0, 5)", 0),
		("ivo_interval_overlaps(ra, dec, 5, 10)=1", 1),
		("1=ivo_interval_overlaps(ra, dec, 5, 25)", 1),
		("1=ivo_interval_overlaps(ra, dec, 20, 25)", 1),
		("1=ivo_interval_overlaps(ra, dec, 25, 35)", 0),
#5
		("1=ivo_interval_overlaps(ra, dec, 5, 55)", 2),
		("1=ivo_interval_overlaps(ra, dec, 5, 155)", 2),
		("1!=ivo_interval_overlaps(ra, dec, 5, 155)", 0),
		("ivo_interval_overlaps(ra, dec, 5, 155)=0", 0),
		("1=ivo_interval_overlaps(COORD1(p), COORD2(p), 5, 25)", 1),
#10
		("1=ivo_interval_overlaps(COORD2(p), COORD1(p), 5, 25)", 0),
		("1=ivo_interval_overlaps(ra, dec, 15, 15)", 1),
		("1=ivo_interval_overlaps(dec, dec, 15, 25)", 1),
		("0=ivo_interval_overlaps(ra, dec, 150, 250)", 2),
		("ivo_interval_overlaps(ra, dec, -100, -30)=0", 2),
	]


class HistogramTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]
	
	def testBasic(self):
		res = self.querier.queryADQL(
			"SELECT ivo_histogram(ra, 20., 60., 2) as ras,"
			"  ivo_histogram(dec, -20., 81., 10) as decs from test.ufuncex"
			"  WHERE testgroup IN ('jd', 'intervals')")
		row = testhelpers.pickSingle(res.rows)
		self.assertEqual(row["ras"], [1, 1, 1, 0])
		self.assertEqual(row["decs"],
			[0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0])

	def testAlias(self):
		res = self.querier.queryADQL(
			"SELECT gavo_histogram(ra, 20., 60., 2) as ras"
			"  from test.ufuncex"
			"  WHERE testgroup IN ('jd', 'intervals')")
		self.assertEqual(res.rows[0]["ras"], [1, 1, 1, 0])

	def testBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"ivo_histogram takes exactly four arguments (the column to aggregate, a lower and upper limit of values to tabulate, and the number of bins desired).",
			getMorphed,
			("select ivo_histogram(1, 2) from test.ufuncex",))


class TransformTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def testMorphing(self):
		self.assertEqual(
			getMorphed("SELECT gavo_transform('ICRS', 'GALACTIC', p) as ps"
				" from test.ufuncex"),
			"SELECT (p)+strans(1.346356097441,-1.097319001837,0.574770524729) AS ps FROM test.ufuncex")

	def testICRSGalactic(self):
		res = self.querier.queryADQL(
			"SELECT ivo_geom_transform('ICRS', 'GALACTIC', p) as ps, p"
			"  from test.ufuncex where testgroup='jd'")
		ps = testhelpers.pickSingle(res.rows)["ps"]

		# XXX TODO: figure out where the precision goes -- this should have
		# about 13 s.f. rather than just 8.
		self.assertAlmostEqual(ps.x/utils.DEG, 95.79525025241531, places=5)
		self.assertAlmostEqual(ps.y/utils.DEG, 48.55779439628186, places=5)
		self.assertEqual(res.tableDef.getColumnByName("ps").ucd, "pos.galactic")

	def testGalacticFK4(self):
		res = self.querier.queryADQL(
			"SELECT gavo_transform('GALACTIC', 'FK4', p) as ps, p"
			"  from test.ufuncex where testgroup='jd'")
		self.assertEqual(
			res.tableDef.getColumnByName("ps").type,
			"spoint")

		ps = testhelpers.pickSingle(res.rows)["ps"]
		self.assertAlmostEqual(ps.x/utils.DEG, 162.4867829887925)
		self.assertAlmostEqual(ps.y/utils.DEG, 15.59041298060233)
		self.assertEqual(res.tableDef.getColumnByName("ps").ucd, "pos.eq")

	def testTransformImmediate(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1 gavo_transform('GALACTIC', 'ICRS',"
			" CIRCLE(10, 10, 3)) AS c"
			"  from test.ufuncex")
		self.assertEqual(
			res.tableDef.getColumnByName("c").type,
			"scircle")

		c = res.rows[0]["c"]
		self.assertAlmostEqual(c.center.x/utils.DEG, 262.89288250964)
		self.assertAlmostEqual(c.radius/utils.DEG, 3)
		self.assertEqual(res.tableDef.getColumnByName("c").ucd, "pos.eq")

	def testInvalidRefsys(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field query: Cannot compute transformation between J2015 and FK4: Unknown reference frame: J2015",
			self.querier.queryADQL,
			("SELECT gavo_transform('J2015', 'FK4', p) as ps, p"
				"  from test.ufuncex where testgroup='jd'",))

	def testTransformBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"gavo_transform takes exactly three arguments",
			getMorphed,
			("select gavo_transform(1, 2) from test.ufuncex",))


class URLFuncsTest(testhelpers.VerboseTest):
	resources = [('conn', tresc.dbConnection),
		("ufuncTestTable", _ufuncTestbed)]

	def testEscaping(self):
		# Well, fair enough: this is not an ADQL UDF at this moment.
		# But it may become one and is defined in //adql, so I'll test it
		# here.
		input = '10% more than 2+5 & fewer = #sql '
		res = list(self.conn.queryToDicts(
			"SELECT gavo_urlescape(%(input)s) as r", {"input": input}))
		self.assertEqual(res[0]["r"],
			"10%25%20more%20than%202%2B5%20%26%20fewer%20%3D%20%23sql%20")
		self.assertEqual(parse.unquote(res[0]["r"]), input)

	def testAuthority(self):
		res = list(self.conn.query(
			"select gavo_getauthority('ivo://org.gavo.dc/foo:bar#quoox') as x"
			" FROM  test.ufuncex where testgroup='jd'"))
		self.assertEqual(res[0][0], "org.gavo.dc")

	def testAuthorityBadarg(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"gavo_getauthority only takes an ivoid argument.",
			getMorphed,
			("select gavo_getauthority(1, 2) from test.ufuncex",))


class NormalRandomTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def testMorph(self):
		self.assertEqual(
			getMorphed("SELECT ra+gavo_normal_random(sin(radians(dec)), dec/10)"
				" as g from test.ufuncex"),
			'SELECT ra + (((random()+random()+random()+random()+random()+random()+random()+random()+random()+random()-5)*(dec / 10))+(SIN(RADIANS(dec)))) AS g FROM test.ufuncex')
	
	def testMessage(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"gavo_normal_random takes mu, sigma arguments.",
			getMorphed,
			("SELECT ra+ivo_normal_random(dec/10) as g from test.ufuncex",))

	def testValues(self):
		res = self.querier.queryADQL(
			"SELECT gavo_normal_random(10, 1) as r FROM test.ufuncex")
		vals = [r["r"] for r in res.rows]
		self.assertTrue(min(vals)>-10, "20-sigma outlier in normal_random?")
		self.assertTrue(max(vals)<30, "20-sigma outlier in normal_random?")
		var = sum([(v-10)**2 for v in vals])
		self.assertTrue(var>1e-2, "No variance in normal_random?")


@unittest.skipUnless(testhelpers.hasUDF("GAVO_MOCUNION"), "pgsphere too old")
class MocTest(testhelpers.VerboseTest):
	resources = [
		("test_ufunc", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),]

	def testUnion(self):
		res = self.querier.queryADQL(
			"SELECT gavo_mocunion(a.m, b.m) as res"
			" FROM test.ufuncex AS a JOIN test.ufuncex AS b USING (testgroup)"
			" WHERE testgroup='moc' and a.m!=b.m")
		mocs = set(r['res'].asASCII() for r in res)
		self.assertEqual(mocs, set(["6/33 38-40 7/49-51"]))
		self.assertEqual(res.tableDef.columns[0].type, "smoc")

	def testIntersect(self):
		res = self.querier.queryADQL(
			"SELECT gavo_mocintersect(a.m, b.m) as res"
			" FROM test.ufuncex AS a JOIN test.ufuncex AS b USING (testgroup)"
			" WHERE testgroup='moc' and a.m!=b.m")
		mocs = set(r['res'].asASCII() for r in res)
		self.assertEqual(mocs, set(["7/50"]))
		self.assertEqual(res.tableDef.columns[0].type, "smoc")


class VocMatchTest(testhelpers.VerboseTest):
	def testDatalinkExpansion(self):
		self.assertEqualIgnoringAliases(
			getMorphed("SELECT * FROM whatever"
				" WHERE 1=gavo_vocmatch('datalink/core', 'calibration', semantics)"),
			"SELECT * FROM whatever WHERE semantics IN ('calibration',"
				" 'bias', 'dark', 'flat')")

	def testInvertedDatalinkExpansion(self):
		self.assertEqual(
			getMorphed("SELECT * FROM whatever"
				" WHERE gavo_vocmatch('datalink/core', 'calibration', semantics)=0"),
			"SELECT * FROM whatever WHERE NOT ( semantics IN ('calibration',"
				" 'bias', 'dark', 'flat') )")

	def testUnknownVocabulary(self):
		self.assertRaisesWithMsg(
			base.ReportableError,
			"No such vocabulary: free invention",
			getMorphed,
			("SELECT * FROM whatever"
				" WHERE 1=gavo_vocmatch('free invention', 'calibration', semantics)",))

	def testUnknownTerm(self):
		self.assertRaisesWithMsg(
			adql.UfuncError,
			"'müll' is not a term in the vocabulary"
			" http://www.ivoa.net/rdf/datalink/core",
			getMorphed,
			("SELECT * FROM whatever"
				" WHERE 1=gavo_vocmatch('datalink/core', 'müll', semantics)",))


class SpectralConversionTest(testhelpers.VerboseTest,
		metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		fromVal, fromUnit, toVal, toUnit = sample
		genExpr = base.getSpecExpr(fromUnit, toUnit).format(fromVal)
		foundVal = eval(genExpr)
		self.assertAlmostEqual(toVal/foundVal, 1, msg=f"{toVal}!={foundVal}")

	samples = [
		# E -> x
		("2-1", "eV", 1.602176634e-19, "J"),
		("20+3", "keV", 0.05390617278104659, "nm"),
		("0.75e-10+0.25e-10", "pJ", 150.9190204696357, "GHz"),
		# λ -> x
		("30+0.2", "nm", 3.02e-8, "m"),
		("0.5+0.5", "AU", 2.0039888121375062, "mHz"),
# 5
		("2000+2000", "Angstrom", 4.966114560429395e-22, "kJ"),
		# ν -> x
		("2e10+4.3e11", "mHz", 450, "MHz"),
		("40+10", "MHz", 5995849160.0, "nm"),
		("30+20", "THz", 0.206783383117008, "eV"),
		# fun
		("0.6", "um",  2.0664032899401197, "eV"),
# 10
		("3", "apc", 3238.513767810653, "MHz"),
	]


class FieldInfoGetter(testhelpers.TestResource):
	def make(self, ignored):
		return adqlglue.DaCHSFieldInfoGetter()


class SpecConvUfuncTest(testhelpers.VerboseTest):
	resources = [
		("ufuncTestTable", _ufuncTestbed),
		("querier", adqltest.adqlQuerier),
		("fig", FieldInfoGetter())]

	def testNoFirstArgUnit(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"specconv: Cannot infer unit of first argument."
			"  Perhaps use the three-argument specconv?",
			adql.parseAnnotating,
			("SELECT gavo_specconv(2, 'kHz') FROM test.ufuncex", self.fig))

	def testIncompatibleUnit(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"deg is not a spectral unit understood here",
			adql.parseAnnotating,
			("SELECT * FROM test.ufuncex where ivo_specconv(ra, 'eV')>2",
				self.fig))

	def testUnannotated(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"specconv: Cannot infer unit of first argument.  Perhaps use the"
			" three-argument specconv?",
			adql.parseAnnotating,
			("SELECT * FROM test.ufuncex where gavo_specconv(3+4, 'eV')>2",
				self.fig))

	def testBorkenUnit(self):
		self.assertRaisesWithMsg(adql.UfuncError,
			"Hurgel is not a spectral unit understood here",
			adql.parseAnnotating,
			("SELECT gavo_specconv(freq, 'cm') as wl"
			" FROM test.ufuncex WHERE gavo_specconv(1.42, 'Hurgel', 'GHz')<1",
			self.fig))

	def testBasic(self):
		res = self.querier.queryADQL(
			"SELECT gavo_specconv(freq, 'eV') as energy"
			" FROM test.ufuncex WHERE gavo_specconv(freq, 'cm')"
			" BETWEEN 1.4 AND 1.5 AND testgroup='specconv'")
		self.assertAlmostEqual(res.rows[0]["energy"], 8.684902090914346e-05)
		self.assertEqual(res.tableDef.getColumnByName("energy").unit, "eV")
		self.assertEqual(res.tableDef.getColumnByName("energy").type,
			"double precision")

	def testThreePar(self):
		res = self.querier.queryADQL(
			"SELECT gavo_specconv(freq, 'cm') as wl"
			" FROM test.ufuncex WHERE ABS(gavo_specconv(1.42, 'cm', 'GHz')-freq)<1"
			" AND testgroup='specconv'")
		self.assertAlmostEqual(res.rows[0]["wl"], 1.4275831333333334)
		self.assertEqual(res.tableDef.getColumnByName("wl").unit, "cm")

	def testLiteral(self):
		res = self.querier.queryADQL(
			"SELECT TOP 1 gavo_specconv(2, 'keV', 'pm') as en"
			" FROM tap_schema.tables")
		self.assertAlmostEqual(res.rows[0]["en"], 619.920997273)
		self.assertEqual(res.tableDef.getColumnByName("en").unit, "pm")
		self.assertEqual(res.tableDef.getColumnByName("en").type,
			"double precision")

if __name__=="__main__":
	testhelpers.main(UfuncDefTest)
