"""
Tests for direct grammars (a.k.a. C boosters).
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import re
import unittest

from gavo.helpers import testhelpers

from gavo import base
from gavo import rsc
from gavo import rscdesc
from gavo import utils
from gavo.grammars import directgrammar

import tresc


class DirectGrammarTest(testhelpers.VerboseTest):
# this is for direct grammars that can't be automatically made:
# Just make sure they're producing something resembling C source.
	rd = testhelpers.getTestRD("dgs")

	def _assertCommonItems(self, src):
		self.assertTrue(src.startswith("#include"))
		self.assertTrue("fi_i,            /* I, integer */" in src)
		self.assertTrue("writeHeader(destination);" in src)

	def testColGrammar(self):
		src = directgrammar.getSource("data/dgs#col")
		self._assertCommonItems(src)
		self.assertTrue("parseFloat(inputLine, F(fi_f), start, len);" in src)

	def testSplitGrammar(self):
		src = directgrammar.getSource("data/dgs#split")
		self._assertCommonItems(src)
		self.assertTrue('char *curCont;' in src)
		self.assertTrue('curCont = strtok(inputLine, "|");' in src)
		self.assertTrue('curCont = strtok(NULL, "|");' in src)

	def testBinGrammar(self):
		src = directgrammar.getSource("data/dgs#bin")
		self._assertCommonItems(src)
		self.assertTrue('#define FIXED_RECORD_SIZE 50' in src)
		self.assertTrue('MAKE_INT(fi_i, *(int32_t*)(inputLine+));' in src)
		self.assertTrue('bytesRead = fread(inputLine, 1, FIXED_RECORD_SIZE, inF);'
			in src)

	def testSourcePlausible(self):
		src = directgrammar.getSource("data/dgs#fits")
		self._assertCommonItems(src)
		self.assertTrue("if (COL_DESCS[i].fitsType==TSTRING) {" in src)
		self.assertTrue("MAKE_BIGINT(fi_b, ((long long*)(data[1]))[rowIndex]);"
			in src)

	def testFITSSecondExtension(self):
		src = directgrammar.getSource("data/dgs#fits2nd")
		self.assertTrue("fits_movabs_hdu(fitsInput, 2+1," in src)
		self.assertTrue("FITSColDesc COL_DESCS[1] = {\n"
			"{.cSize = sizeof(long long), .fitsType = TLONGLONG, .index=1, .arraysize=1}\n};"
			in src)

	def testFITSWithAdditionalCols(self):
		src = directgrammar.getSource("data/dgs#fitsplus")
		self._assertCommonItems(src)
		self.assertTrue("FITSColDesc COL_DESCS[5] = {" in src)
		self.assertTrue("#define QUERY_N_PARS 6")
		self.assertTrue("MAKE_NULL(fi_artificial);"
			" /* MAKE_TEXT(fi_artificial, FILL IN VALUE); */" in src)

	# XXX TODO: tests for column reordering, skipping unused columns in FITS

directgrammar.CBooster.silence_for_test = True

class _FITSBoosterImportedTable(testhelpers.TestResource):
	resources = [("conn", tresc.dbConnection)]

	def make(self, deps):
		conn = deps["conn"]
		dd = base.caches.getRD("data/dgs").getById("impfits")
		self.srcName = dd.grammar.cBooster
		with open(self.srcName, "w", encoding="utf-8") as f:
			f.write(directgrammar.getSource("data/dgs#fits"))

		data = rsc.makeData(dd, connection=conn)
		table = data.getPrimaryTable()
		rows = list(table.iterQuery(table.tableDef))
		return rows, data.getPrimaryTable()

	def clean(self, res):
		os.unlink(self.srcName)
		res[1].drop()


class FITSDirectGrammarTest(testhelpers.VerboseTest):
	
	resources = [("imped", _FITSBoosterImportedTable())]

	def testInteger(self):
		self.assertEqual(self.imped[0][0]["i"], 450000)

	def testBigint(self):
		self.assertEqual(self.imped[0][0]["b"], 4009249430)
	
	def testFloat(self):
		self.assertAlmostEqual(self.imped[0][0]["f"], 3.2)
	
	def testDouble(self):
		self.assertEqual(self.imped[0][0]["d"], 5e120)

	def testTextAndMap(self):
		self.assertEqual(self.imped[0][0]["t"], "foobar")

	def testUnknownNULL(self):
		self.assertEqual(self.imped[0][0]["artificial"], None)



try:
	from gavo.grammars import hdf5grammar #noflake: just testing if it's there
	def skipIfNoH5py(obj):
		return obj
except ImportError:
	def skipIfNoH5py(obj):
		return unittest.skip("No h5py")(obj)



_VAEX_RD_TEMPLATE = """
<resource schema="data">
	<table id="testing" onDisk="True"><column name="a"/></table>
	<data id="import">
		{}
		<make table="testing"/>
	</data>
</resource>"""

class _HDF5vaexBoosterSource(testhelpers.TestResource):
	def make(self, deps):
		rd = base.parseFromString(rscdesc.RD, _VAEX_RD_TEMPLATE.format(
			'<sources pattern="fromvaex.hdf5"/>'
			'<directGrammar id="boost" cBooster="res/testing.c" type="hdf5vaex"/>'))
		try:
			return directgrammar.buildSource(
				rd.getById("boost"), rd.getById("testing"))
		except base.ReportableError:
			# probably no h5py
			return None


@skipIfNoH5py
class HDF5vaexBoosterCodegenTest(testhelpers.VerboseTest):
	resources = [("src", _HDF5vaexBoosterSource())]

	def testBasic(self):
		typedef = re.search("(?s)typedef(.*?)InRec;", self.src.original)
		self.assertEqual(typedef.group(0),
			"typedef struct InRec_s {\n"
			"  float parallax;\n"
			"  uint64_t source_id;\n"
			"} InRec;")

	def testWithoutSource(self):
		rd = base.parseFromString(rscdesc.RD, _VAEX_RD_TEMPLATE.format(
			'<directGrammar id="boost" cBooster="res/testing.c" type="hdf5vaex"/>'))
		self.assertRaisesWithMsg(base.StructureError,
			"Cannot make HDF5 vaex booster without a sources element on the embedding data.",
			directgrammar.buildSource,
			(rd.getById("boost"), rd.getById("testing")))

	def testWithoutMatchingSource(self):
		rd = base.parseFromString(rscdesc.RD, _VAEX_RD_TEMPLATE.format(
			'<sources pattern="data/does-not-exist"/>'
			'<directGrammar id="boost" cBooster="res/testing.c" type="hdf5vaex"/>'))
		self.assertRaisesWithMsg(base.StructureError,
			"Building an HDF5 booster requires at least one matching source.",
			directgrammar.buildSource,
			(rd.getById("boost"), rd.getById("testing")))

	def testWithWrongDataset(self):
		rd = base.parseFromString(rscdesc.RD, _VAEX_RD_TEMPLATE.format(
			'<sources pattern="fromvaex.hdf5"/>'
			'<directGrammar id="boost" cBooster="res/testing.c" type="hdf5vaex">'
			'  <property name="dataset">road/to/nowhere</property>'
			'</directGrammar>'))
		self.assertRaisesWithMsg(base.StructureError,
			utils.EqualingRE("Cannot access dataset road/to/nowhere in .*data/fromvaex.hdf5.  Override the grammar's dataset property to point it to the right dataset."),
			directgrammar.buildSource,
			(rd.getById("boost"), rd.getById("testing")))


_VAEX_RD = """
<resource schema="data">
	<table id="exvaex" onDisk="True" temporary="True">
		<column name="parallax"/>
		<column name="source_id" type="bigint"/>
	</table>
	<data id="import">
		<sources pattern="fromvaex.hdf5"/>
		<directGrammar id="booster" type="hdf5vaex" cBooster="tmp.c">
			<property name="chunkSize">1</property>
		</directGrammar>
		<make table="exvaex"/>
	</data>
</resource>"""


class _VAEXBoosterImportedTable(testhelpers.TestResource):
	resources = [("conn", tresc.dbConnection)]

	def make(self, deps):
		conn = deps["conn"]
		rd = base.parseFromString(rscdesc.RD, _VAEX_RD)
		dd = rd.getById("import")
		self.srcName = dd.grammar.cBooster
		try:
			with open(self.srcName, "w", encoding="utf-8") as f:
				f.write(directgrammar.buildSource(dd.grammar, rd.getById("exvaex")))
		except base.ReportableError:
			# probably no h5py
			return None

		data = rsc.makeData(dd, connection=conn)
		table = data.getPrimaryTable()
		rows = list(table.iterQuery(table.tableDef))
		return rows, data.getPrimaryTable()

	def clean(self, res):
		os.unlink(self.srcName)
		if res.original is not None:
			res[1].drop()



@skipIfNoH5py
class HDF5vaexBoosterImportTest(testhelpers.VerboseTest):
	resources = [("imped", _VAEXBoosterImportedTable())]

	def testAllIn(self):
		self.assertEqual(len(self.imped[0]), 2)
	
	def testBigint(self):
		self.assertEqual(self.imped[0][0]["source_id"], 4464711039265053952)

	def testFloat(self):
		self.assertAlmostEqual(self.imped[0][1]["parallax"], 1.84249)


if __name__=="__main__":
	testhelpers.main(DirectGrammarTest)
