# -*- coding: utf-8 -*-
"""
Tests for grammars and their helpers.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import base64
import contextlib
import io
import os
import struct
import unittest

from astropy import table

from gavo.helpers import testhelpers

from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils
from gavo.grammars import binarygrammar
from gavo.grammars import columngrammar
from gavo.grammars import common
from gavo.grammars import fitsprodgrammar
from gavo.grammars import fitstablegrammar
from gavo.grammars import odbcgrammar
from gavo.grammars import pdsgrammar
from gavo.grammars import regrammar
from gavo.grammars import uniongrammar
from gavo.grammars import xmlgrammar
from gavo.helpers import testtricks

try:
	from gavo.grammars import hdf5grammar
	def skipIfNoH5py(obj):
		return obj
except ImportError:
	def skipIfNoH5py(obj):
		return unittest.skip("No h5py")(obj)

@contextlib.contextmanager
def TestFile(initData):
	"""context-manages a BytesIO that's constructed with a string.
	"""
	f = io.BytesIO(utils.bytify(initData))
	try:
		yield f
	finally:
		f.close()


def getCleaned(rawIter):
	"""returns cleaned rawdicts form a rawdict iterator

	(this currently just kills the parser_ key).
	"""
	res = []
	for d in rawIter:
		del d["parser_"]
		res.append(d)
	return res


class PredefinedRowfilterTest(testhelpers.VerboseTest):
	def testOnIndex(self):
		dd = testhelpers.getTestRD().getById("expandOnIndex")
		data = rsc.makeData(dd, forceSource=[{"b": 3, "c": 4, "a": "eins"}])
		self.assertEqual(data.getPrimaryTable().rows,
			[{'a': 'eins', 'c': 4, 'b': 3, 'd': 3},
				{'a': 'eins', 'c': 4, 'b': 3, 'd': 4}])

	def testExpandComma(self):
		dd = testhelpers.getTestRD().getById("expandComma")
		data = rsc.makeData(dd, forceSource=[{"stuff": "x,yz,foo, bar ",
			"b": 23}, {"stuff":"quux", "b": 3}])
		self.assertEqual(data.getPrimaryTable().rows, [
			{'a': 'x', 'b': 23}, {'a': 'yz', 'b': 23},
			{'a': 'foo', 'b': 23}, {'a': 'bar', 'b': 23},
			{'a': 'quux', 'b': 3}])

	def testStandardPreviewPath(self):
		dd = testhelpers.getTestRD().getById("productimport-skip")
		res = set()
		for source in dd.iterSources(None):
			for row in dd.grammar.parse(source, None):
				res.add(row["prodtblPreview"])

		self.assertEqual(set(str(p) for p in res),
			set(['prefoo/ZGF0YS9hLmltcA==',
				'prefoo/ZGF0YS9iLmltcA==']))


class SequencedRowfilterTest(testhelpers.VerboseTest):
	def _makeGrammar(self, rowgenDefs):
		return base.parseFromString(rscdef.getGrammar("dictlistGrammar"),
			"<dictlistGrammar>%s</dictlistGrammar>"%rowgenDefs)

	def _getProcessedFor(self, filterDefs, input):
		g = self._makeGrammar(filterDefs)
		res = getCleaned(g.parse(input))
		return res

	def testSimplePipe(self):
		res = self._getProcessedFor("""
			<rowfilter><code>
					row["output"] = row["input"]+1
					del row["input"]
					yield row
			</code></rowfilter>
			<rowfilter><code>
					row["processed"] = row["output"]*row["output"]
					yield row
			</code></rowfilter>""", [{"input": 2}])
		self.assertEqual(res, [{"output":3, "processed":9}])
	
	def testForking(self):
		res = self._getProcessedFor("""
			<rowfilter><code>
					b = row["input"]
					del row["input"]
					row["output"] = b
					yield row.copy()
					row["output"] += b
					yield row
			</code></rowfilter>
			<rowfilter><code>
					row["processed"] = row["output"]*row["output"]
					yield row.copy()
					row["processed"] = row["processed"]*row["output"]
					yield row
			</code></rowfilter>""", [{"input": 2}])
		self.assertEqual(res, [
			{"output":2, "processed":4},
			{"output":2, "processed":8},
			{"output":4, "processed":16},
			{"output":4, "processed":64},])


class _MacroExpandedStuff(testhelpers.TestResource):
	def make(self, ignored):
		from gavo import rscdesc
		rd = base.parseFromString(rscdesc.RD, r"""<resource schema="test"
				resdir="">
			<table id="foo"><column name="prodtblPath" type="text"/>
				<column name="prodtblPreview" type="text"/>
				<column name="srcstem" type="text"/>
				<column name="rootless" type="text"/>
				<column name="previewDir" type="text"/>
				<column name="prodtblEmbargo" type="timestamp"/>
			</table>
			<data id="i">
				<property name="previewDir">previews</property>
				<sources pattern="data/ex.fits"/>
				<fitsProdGrammar>
					<rowfilter procDef="//products#define">
						<bind name="table">"foo"</bind>
						<bind name="path">\fullDLURL{echdl}</bind>
						<bind name="preview">\splitPreviewPath{.jpeg}</bind>
						<bind name="embargo">\sourceDate</bind>
					</rowfilter>
					<rowfilter name="morestuff">
						<code>
							row.update({
									"srcstem": \srcstem,
									"previewDir": "\property{previewDir}",
									"rootless": \rootlessPath,
							})
							yield row
						</code>
					</rowfilter>
				</fitsProdGrammar>
				<make table="foo"/></data>
				<service id="echdl"><datalinkCore/></service></resource>""")
		rd.sourceId = "glob/bob"
		return rsc.makeData(rd.getById("i")).getPrimaryTable().rows


class MacroTest(testhelpers.VerboseTest):
	resources = [("rows", _MacroExpandedStuff())]

	def testDLURL(self):
		self.assertEqual(self.rows[0]["prodtblPath"],
			"http://localhost:8080/glob/bob/echdl/dlget"
			"?ID=ivo%3A%2F%2Fx-testing%2F~%3Fdata%2Fex.fits")

	def testSplitPreview(self):
		self.assertEqual(self.rows[0]["prodtblPreview"],
			"previews/data/ex.fits.jpeg")

	def testSourceDate(self):
		# should we force the source date to something we can test here?
		self.assertTrue(self.rows[0]["prodtblEmbargo"].year>2010,
			"Ancient files on disk?")

	def testSrcstem(self):
		self.assertEqual(self.rows[0]["srcstem"], "ex")

	def testPreviewDir(self):
		self.assertEqual(self.rows[0]["previewDir"], "previews")

	def testRootless(self):
		self.assertEqual(self.rows[0]["rootless"], "data/ex.fits")
	
ignoreTestData = [
	{'a': 'xy', 'b': 'cc', 'd': 'yok'},
	{'a': 'xy', 'b': 'DD'},
	{'a': 'zz', 'b': ''},
	]

class IgnoreTests(testhelpers.VerboseTest):
	def _makeGrammar(self, ignoreClauses):
		return base.parseFromString(rscdef.getGrammar("dictlistGrammar"),
			"<dictlistGrammar><ignoreOn>%s</ignoreOn></dictlistGrammar>"%
				ignoreClauses)

	def _makeBailingGrammar(self, ignoreClauses):
		return base.parseFromString(rscdef.getGrammar("dictlistGrammar"),
			"<dictlistGrammar><ignoreOn bail='True'>%s</ignoreOn></dictlistGrammar>"%
				ignoreClauses)

	def _assertResultLen(self, ignoreClauses, expectedLength):
		res = list(self._makeGrammar(ignoreClauses).parse(ignoreTestData))
		self.assertEqual(len(res), expectedLength,
			"%s yielded %s, expected %d rows"%(ignoreClauses, res, expectedLength))

	def testKeyIs(self):
		self._assertResultLen('<keyIs key="a" value="xy"/>', 1)
		self._assertResultLen('<keyIs key="a" value="zz"/>', 2)
		self._assertResultLen('<keyIs key="a" value=""/>', 3)
		self._assertResultLen('<keyIs key="b" value=""/>', 2)
		self._assertResultLen('<keyIs key="d" value="yok"/>', 2)

	def testKeyPresent(self):
		self._assertResultLen('<keyPresent key="a"/>', 0)
		self._assertResultLen('<keyPresent key="b"/>', 0)
		self._assertResultLen('<keyPresent key="d"/>', 2)
		self._assertResultLen('<keyPresent key="yikes"/>', 3)

	def testTriggerSeq(self):
		self._assertResultLen('<keyPresent key="d"/><keyIs key="b" value=""/>'
			, 1)

	def testNot(self):
		self._assertResultLen('<not><keyPresent key="a"/></not>', 3)
		self._assertResultLen('<not><keyPresent key="d"/></not>', 1)
		self._assertResultLen('<not><keyPresent key="d"/>'
			'<keyIs key="b" value=""/></not>', 2)
	
	def testAnd(self):
		self._assertResultLen('<and><keyIs key="a" value="xy"/>'
			'<keyIs key="b" value="DD"/></and>', 2)

	def testBail(self):
		g = self._makeBailingGrammar('<keyMissing key="d"/>')
		def parseAll():
			return list(g.parse(ignoreTestData))
		self.assertRaises(rscdef.TriggerPulled, parseAll)
	
	def testBailNot(self):
		g = self._makeBailingGrammar('<keyMissing key="a"/>')
		list(g.parse(ignoreTestData))


class EmbeddedGrammarTest(testhelpers.VerboseTest):
	def testSimple(self):
		from gavo import rscdesc
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><data id="fake"><embeddedGrammar>
				<iterator><code>
					yield {'x': 1, 'y': 2}
					yield {'x': 2, 'y': 2}
				</code></iterator></embeddedGrammar></data></resource>""")
		self.assertEqual(getCleaned(rd.dds[0].grammar.parse(None)),
			[{'y': 2, 'x': 1}, {'y': 2, 'x': 2}])

	def testErrorBehaviour(self):
		from gavo import rscdesc
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test">
				<table id="bla"><column name="foo" type="text"/></table>
				<data id="fake"><embeddedGrammar>
				<iterator><code>
					for prefix in ["vor", "nach"]:
						self.location = f"{self.sourceToken}, {prefix}"
						yield {"foo": prefix+self.sourceToken}
						UndefinedName
				</code></iterator></embeddedGrammar><make table="bla"/>
				</data></resource>""")
		self.assertRaisesWithMsg(base.SourceParseError,
			'At sicht, vor: NameError("name \'UndefinedName\' is not defined")',
			rsc.makeData,
			(rd.dds[0],), forceSource="sicht")


class KVGrammarTest(testhelpers.VerboseTest):
	def testSimple(self):
		grammar = base.parseFromString(rscdef.getGrammar("keyValueGrammar"),
			r'<keyValueGrammar commentPattern="--.*?\*/" enc="utf-8"/>')
		with TestFile("a=b\nc=2 --nothing*/\n"
				"wonkö:Närd".encode("utf-8")) as f:
			rec = list(grammar.parse(f))[0]
		self.assertEqual(rec["a"], 'b')
		self.assertEqual(rec["c"], '2')
		self.assertEqual(rec["wonkö"], 'Närd')
	
	def testPairs(self):
		grammar = base.parseFromString(rscdef.getGrammar("keyValueGrammar"),
			'<keyValueGrammar kvSeparators="/" pairSeparators="%"'
			' yieldPairs="True"/>')
		with TestFile("a/b%c/d") as f:
			recs = [(v['key'], v['value'])
				for v in grammar.parse(f)]
		self.assertEqual(recs, [('a', 'b'), ('c', 'd')])

	def testError(self):
		self.assertRaisesWithMsg(base.LiteralParseError,
			"At IO:'<keyValueGrammar commentPattern=\"**\"/>', (1, 0):"
			" '**' is not a valid value for commentPattern",
			base.parseFromString,
			(rscdef.getGrammar("keyValueGrammar"),
			'<keyValueGrammar commentPattern="**"/>'))


class CSVGrammarTest(testhelpers.VerboseTest):
	def testSimple(self):
		grammar = base.parseFromString(rscdef.getGrammar("csvGrammar"),
			'<csvGrammar/>')
		with TestFile("la,le,lu\n1, 2, schaut") as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs,
			[{"la": '1', "le": ' 2', "lu": " schaut"}])

	def testStrip(self):
		grammar = base.parseFromString(rscdef.getGrammar("csvGrammar"),
			'<csvGrammar strip="True"/>')
		with TestFile("la,le,lu\n1, 2, schaut") as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs,
			[{"la": '1', "le": '2', "lu": "schaut"}])

	def testNames(self):
		grammar = base.parseFromString(rscdef.getGrammar("csvGrammar"),
			'<csvGrammar names="col1, col2, col3"/>')
		with TestFile("la,le,lu\n1,2,schaut") as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs, [
			{"col1": "la", "col2": "le", "col3": "lu"},
			{"col1": '1', "col2": '2', "col3": "schaut"}])

	def testSkipLines(self):
		grammar = base.parseFromString(rscdef.getGrammar("csvGrammar"),
			'<csvGrammar topIgnoredLines="3"/>')
		with TestFile(
			"There's\nsome\njunk at the top here\nla,le,lu\n1,2,schaut") as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs, [
			{"la": '1', "le": '2', "lu": "schaut"}])

	def testWithPreFilter(self):
		grammar = base.parseFromString(rscdef.getGrammar("csvGrammar"),
			'<csvGrammar preFilter="zcat"/>')
		with TestFile(
				b'\x1f\x8b\x08\x08\xb7\xd2\xa4U\x00\x03zw.txt\x00\xcbI\xd4\xc9I'
				b'\xd5\xc9)\xe52\xd41\xd2)N\xceH,-\xe1\x02\x00S@8\x1f\x14'
				b'\x00\x00\x00') as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs, [
			{"la": '1', "le": '2', "lu": "schaut"}])


class ColDefTest(testhelpers.VerboseTest):
	def testSimple(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			'<columnGrammar colDefs="a:1 B:2-5 C_dfoo:4 _gobble:6-8"/>')
		with TestFile("abcdefghijklmnoq") as f:
			res = getCleaned(g.parse(f))[0]
			self.assertEqual(res, {'a': 'a', 'C_dfoo': 'd', 'B': 'bcde',
				'_gobble': 'fgh'})

	def testFunkyWhite(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			'<columnGrammar colDefs="a :1 B: 2 - 5 C_dfoo: 4 _gobble : 6 -8"/>')
		with TestFile("abcdefghijklmnoq") as f:
			res = getCleaned(g.parse(f))[0]
		self.assertEqual(res, {'a': 'a', 'C_dfoo': 'd', 'B': 'bcde',
			'_gobble': 'fgh'})
	
	def testHalfopen(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			'<columnGrammar><colDefs>a:5- B:-5</colDefs></columnGrammar>')
		with TestFile("abcdefg") as f:
			res = getCleaned(g.parse(f))[0]
		self.assertEqual(res, {'a': 'efg', 'B': 'abcde'})

	def testBeauty(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			"""<columnGrammar><colDefs>
				a:      5-
				B:      -5
				gnugga: 1-2
				</colDefs></columnGrammar>""")
		with TestFile("abcdefg") as f:
			res = getCleaned(g.parse(f))[0]
		self.assertEqual(res, {'a': 'efg', 'B': 'abcde', 'gnugga': 'ab'})

	def testErrorBadChar(self):
		self.assertRaisesWithMsg(base.LiteralParseError,
			"At IO:'<columnGrammar><colDefs>a:5-% B:-5</colDefs></columnG...', (1, 34):"
			" 'a:5-% B:-5' is not a valid value for colDefs",
			base.parseFromString, (columngrammar.ColumnGrammar,
			'<columnGrammar><colDefs>a:5-% B:-5</colDefs></columnGrammar>'))
	
	def testErrorNiceHint(self):
		try:
			base.parseFromString(columngrammar.ColumnGrammar,
				'<columnGrammar><colDefs>a:5- B:c</colDefs></columnGrammar>')
		except base.LiteralParseError as ex:
			self.assertTrue(ex.hint.endswith(
				"Expected end of text, found 'B'  (at char 5), (line:1, col:6)"))
		else:
			self.fail("LiteralParseError not raised")


class ColumnGrammarTest(testhelpers.VerboseTest):
	def testWithComment(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			'<columnGrammar commentIntroducer="#"><colDefs>a:1-</colDefs>'
			'</columnGrammar>')
		with TestFile("#Anfang\nMitte\n#Ende") as f:
			res = list(g.parse(f))
		self.assertEqual(testhelpers.pickSingle(res)['a'], 'Mitte')

	def testWithoutComment(self):
		g = base.parseFromString(columngrammar.ColumnGrammar,
			'<columnGrammar><colDefs>a:1-</colDefs>'
			'</columnGrammar>')
		with TestFile("#Anfang\nMitte\n#Ende") as f:
			res = list(g.parse(f))
		self.assertEqual(len(res), 3)
		self.assertEqual(res[0]['a'], "#Anfang")

	def testWithPreFilterAndEnc(self):
		grammar = base.parseFromString(rscdef.getGrammar("columnGrammar"),
			"""<columnGrammar preFilter="zcat" enc="utf-8">
				<colDefs>a: 1-4</colDefs></columnGrammar>""")
		with TestFile(b'\x1f\x8b\x08\x08\x02ps^\x00\x03x\x00s9\xbc-%5\xe7'
				b'\xdad.\x85\x93\x13\xcfM\xe2\x02\x00Y\xd9\xa1S\x0f\x00\x00\x00') as f:
			recs = getCleaned(grammar.parse(f))
		self.assertEqual(recs, [
			{"a": "Döde"},
			{"a": "ɑΒ"}])

	def testEndMarker(self):
		grammar = base.parseFromString(rscdef.getGrammar("columnGrammar"),
			"""<columnGrammar endMarker="schlussjetzt  $">
				<colDefs>a: 1-4</colDefs></columnGrammar>""")
		with TestFile("schluss jetzt\nschlussjetzt nicht\n"
				"  schlussjetzt wieder nicht\nschlussjetzt  \ndasnicht\n") as f:
			recs = getCleaned(grammar.parse(f))
			self.assertEqual(recs,
				[{'a': 'schl'}, {'a': 'schl'}, {'a': 'sc'}])


class BinaryRecordTest(testhelpers.VerboseTest):
	def testTypes(self):
		brd = base.parseFromString(binarygrammar.BinaryRecordDef,
			"""<binaryRecordDef binfmt="packed">
				chr(1s) fong(12s) b(b) B(B) h(h) H(H) i(i) I(I) q(q) Q(Q)
				f(f) d(d)</binaryRecordDef>""")
		self.assertEqual(brd.structFormat, "=1s12sbBhHiIqQfd")
		self.assertEqual(brd.recordLength, 55)

	def testBadIdentifier(self):
		self.assertRaises(base.LiteralParseError,
			base.parseFromString, binarygrammar.BinaryRecordDef,
			"<binaryRecordDef>22s(d)</binaryRecordDef>")

	def testBadCode(self):
		self.assertRaises(base.LiteralParseError,
			base.parseFromString, binarygrammar.BinaryRecordDef,
			"<binaryRecordDef>x(P)</binaryRecordDef>")

	def testNativeTypes(self):
		brd = base.parseFromString(binarygrammar.BinaryRecordDef,
			"<binaryRecordDef>c(1s)s(i)t(d)</binaryRecordDef>")
		self.assertEqual(brd.structFormat, "1sid")
		self.assertFalse(brd.recordLength==13, "You platform doesn't pack?")


class BinaryGrammarTest(testhelpers.VerboseTest):
	plainTestData = [(42, 0.25), (-30, 40.)]
	plainExpectedResult = [{'s': 42, 't': 0.25}, {'s': -30, 't': 40.0}]

	def testUnarmoredParse(self):
		with TestFile(b"u"*20+b"".join(struct.pack("id", *r)
				for r in self.plainTestData)) as f:
			grammar = base.parseFromString(binarygrammar.BinaryGrammar,
				"""<binaryGrammar skipBytes="20"><binaryRecordDef>s(i)t(d)
				</binaryRecordDef></binaryGrammar>""")
			self.assertEqual(
				getCleaned(grammar.parse(f)),
				self.plainExpectedResult)

	def testNetworkBinfmt(self):
		with TestFile(b"".join(struct.pack("!id", *r)
				for r in self.plainTestData)) as inputFile:
			grammar = base.parseFromString(binarygrammar.BinaryGrammar,
				"""<binaryGrammar><binaryRecordDef binfmt="big">s(i)t(d)
				</binaryRecordDef></binaryGrammar>""")
			self.assertEqual(
				getCleaned(grammar.parse(inputFile)),
				self.plainExpectedResult)


	def testFortranParse(self):

		def doFortranArmor(data):
			return struct.pack(b"i%dsi"%len(data), len(data), data, len(data))

		with TestFile(b"".join(doFortranArmor(struct.pack("id", *r))
				for r in self.plainTestData)) as inputFile:
			grammar = base.parseFromString(binarygrammar.BinaryGrammar,
				"""<binaryGrammar armor="fortran"><binaryRecordDef>s(i)t(d)
				</binaryRecordDef></binaryGrammar>""")
			self.assertEqual(
				getCleaned(grammar.parse(inputFile)),
				self.plainExpectedResult)


class FITSProdGrammarTest(testhelpers.VerboseTest):

	sample = os.path.join(base.getConfig("inputsDir"), "data", "ex.fits")
	grammarT = fitsprodgrammar.FITSProdGrammar

	def _getParse(self, grammarDef):
		grammar = base.parseFromString(self.grammarT, grammarDef)
		return list(grammar.parse(self.sample))[0]

	def _assertBasicFieldsPresent(self, d):
		self.assertEqual(len(d), 104)
		self.assertEqual(d["EXTEND"], True)
		self.assertEqual(d["OBSERVER"], "M.Wolf")
		self.assertEqual(d["LATPOLE"], 0.0)
		self.assertTrue("PLATE_ID" in d)

	def testBasic(self):
		self._assertBasicFieldsPresent(
			self._getParse("""<fitsProdGrammar qnd="False"/>"""))

	def testBasicQnD(self):
		self._assertBasicFieldsPresent(
			self._getParse("""<fitsProdGrammar/>"""))

	def testNameMapping(self):
		d = self._getParse("""<fitsProdGrammar><mapKeys><map
			dest="blind">EXPTIME</map></mapKeys></fitsProdGrammar>""")
		self.assertEqual(d["blind"], '10801')
	
	def testHDUsField(self):
		d = self._getParse("""<fitsProdGrammar hdusField="__HDUS"/>""")
		self.assertEqual(d["__HDUS"][0].data[0][0], 7896.0)


class FITSTableGrammarTest(testhelpers.VerboseTest):
	def testBasic(self):
		t = table.Table([
			table.MaskedColumn([1, 0], mask=[False, True]),
			table.MaskedColumn([2.0, 0], mask=[False, True]),
			table.MaskedColumn(['foo', ''], mask=[False, True])],
			names=["an_int", "a_float", "a_string"],
			dtype=["i4", "f8", str])

		with utils.sandbox():
			t.write("tmp.fits", format="fits")
			grammar = base.parseFromString(fitstablegrammar.FITSTableGrammar,
				"<fitsTableGrammar/>")
			res = list(grammar.parse("tmp.fits"))

			self.assertEqual(res[0]["an_int"], 1)
			self.assertEqual(res[0]["a_float"], 2.0)
			self.assertEqual(res[0]["a_string"], 'foo')

			self.assertEqual(res[1]["an_int"], None)
			self.assertEqual(res[1]["a_float"], None)
			self.assertEqual(res[1]["a_string"], None)


class ReGrammarTest(testhelpers.VerboseTest):
	def testBadInputRejection(self):
		grammar = base.parseFromString(regrammar.REGrammar,
			"""<reGrammar names="a,b"/>""")
		with TestFile("1 2\n3") as f:
			self.assertRaisesWithMsg(base.SourceParseError,
				"At line 2: 1 fields found, expected 2",
				lambda: list(grammar.parse(f)),
				())

	def testComment(self):
		grammar = base.parseFromString(regrammar.REGrammar,
			"""<reGrammar names="a,b" commentPat="(?m)^#.*$"/>""")
		with TestFile("1 2\n# more data\n3 2\n#end of file.\n") as f:
			self.assertEqual(
				getCleaned(grammar.parse(f)),
				[{'a': '1', 'b': '2'}, {'a': '3', 'b': '2'}])

	def testNoComment(self):
		grammar = base.parseFromString(regrammar.REGrammar,
			"""<reGrammar names="a,b"/>""")
		with TestFile("1 2\n#more data\n3 2\n#endof file.\n") as f:
			self.assertEqual(
				getCleaned(grammar.parse(f)), [
					{'a': '1', 'b': '2'},
					{'a': '#more', 'b': 'data'},
					{'a': '3', 'b': '2'},
					{'a': '#endof', 'b': 'file.'}])

	def testIgnoredEnd(self):
		grammar = base.parseFromString(regrammar.REGrammar,
			r"""<reGrammar names="a,b" recordSep="\\\\" commentPat="(?m)^\\.*$"/>""")
		with TestFile("1\n2\\\\\n3 4\\\\\n\\ignore all this\n\n\\and this.\n"
				) as f:
			self.assertEqual(
				getCleaned(grammar.parse(f)), [
					{'a': '1', 'b': '2'},
					{'a': '3', 'b': '4'}])

	def testLineCount(self):
		grammar = base.parseFromString(regrammar.REGrammar,
			r"""<reGrammar names="a,b" recordSep="//" fieldSep=","/>""")
		with TestFile("a,b//c,d//e\n,f//g,h//j//\n1,d//\n") as f:
			self.assertRaisesWithMsg(base.SourceParseError,
				"At line 2: 1 fields found, expected 2",
				list,
				(grammar.parse(f),))


class FilteredInputTest(testhelpers.VerboseTest):
	def testSimple(self):
		with testtricks.testFile("filterInput", "ab\ncd\nef\n") as srcName:
			f = common.FilteredInputFile("tac", open(srcName, "rb"))
			self.assertEqual(f.read(), b"ef\ncd\nab\n")
			f.close()

	def testLargeOutput(self):
		data = "                    \n"*200000
		with testtricks.testFile(
				"filterInput", data, writeGz=True) as srcName:
			f = common.FilteredInputFile("zcat", open(srcName, "rb"))
			result = f.read()
			self.assertEqual(result, data.encode("ascii"))
			f.close()

	def testLargeInput(self):
		with TestFile("                    \n"*200000) as inF:
			f = common.FilteredInputFile("gzip", inF)
			result = f.read()
			self.assertEqual(len(result), 10216)
			f.close()

	def testFailedCommand(self):
		with TestFile("abc") as outerF:
			f = common.FilteredInputFile("verpotshket", outerF,
				silent=True)
			self.assertRaisesWithMsg(IOError,
				"Child exited with return code 127",
				f.read,
				())
			f.close()

	def testReadWithSizeAndClose(self):
		with TestFile("abc") as outerF:
			f = common.FilteredInputFile("yes", outerF, silent=True)
			self.assertEqual(b"y\n"*10, f.read(20))
			f.close()
		self.assertEqual(f.process.returncode, -15)

	def testReadline(self):
		with TestFile(base64.b64decode(
				"H4sIAFcmV1AAA0vkSgQCMDHcABcXAN3p7JLdAAAA")) as outerF:
			f = common.FilteredInputFile("zcat", outerF)
			self.assertEqual(f.readline(), b"a\n")
			self.assertEqual(f.readline(), b"aaaa\n")
			self.assertEqual(f.readline(), b"a"*212+b"\n")
			self.assertEqual(f.readline(), b"\n")
			self.assertEqual(f.readline(), b"")

	def testReadlineNoLF(self):
		with TestFile("AAAA\nBBBB") as outerF:
			f = common.FilteredInputFile("cat", outerF)
			self.assertEqual(f.readline(), b"AAAA\n")
			self.assertEqual(f.readline(), b"BBBB")


class PDSGrammarTest(testhelpers.VerboseTest):
	def testLabelTypes(self):
		grammar = base.parseFromString(pdsgrammar.PDSGrammar, "<pdsGrammar/>")
		with TestFile(
				'PDS_VERSION_ID = PDS3\r\nLABEL_REVISION_NOTE = '
				'"SE-MTC,09/07/2010"\r\n\r\n /* File format and length */'
				'\r\nPRODUCT_ID = "S1_00237390711"\r\nORIGINAL_PRODUCT_ID ='
				' "PSA7AD50"\r\nEND\r\n') as f:
			try:
				recs = list(grammar.parse(f))
			except base.ReportableError:
				# PyPDS probably missing, skip this test
				return

		self.assertEqual(
			testhelpers.pickSingle(recs)["PRODUCT_ID"],
			'"S1_00237390711"')


class ContextGrammarTest(testhelpers.VerboseTest):
	def testCaseMess(self):
		grammar = base.parseFromString(svcs.ContextGrammar,
			"""<contextGrammar inputTD="data/cores#knok">
				<inputKey name="UPPER"/>
				<inputKey name="UppER"/>
				<inputKey name="lower"/>
			</contextGrammar>""")
		ri = grammar.parse({
			"Lower": ["1"],
			"UPPER": "2",
			"UppER": "3",})
		list(ri)
		res = ri.getParameters()
		self.assertEqual(res["UppER"], 3.0)
		self.assertEqual(res["UPPER"], 2.0)
		self.assertEqual(res["rV"], -4.0)
		self.assertEqual(res["lower"], 1.0)


class XMLGrammarTest(testhelpers.VerboseTest):
	def testPathMogrification(self):
		with TestFile(
				"""<root><elem attr="foo">eins<child>bar</child>zwei
					<child>quux</child>drei</elem>
					<x:other xmlns:x="http://burp" what="a" noise="b"/>
					<other><child/></other>
					</root>""") as f:
			self.assertEqual(
				list(xmlgrammar.iterEventsCounting(f, True)), [
				('root/elem/child', 'bar'),
				('root/elem/child[0]', 'quux'),
				('root/elem', 'einszwei drei'),
				('root/elem/@attr', 'foo'),
				('root/other', None),
				('root/other/@what', 'a'),
				('root/other/@noise', 'b'),
				('root/other[0]/child', None),
				('root/other[0]', None),
				('root', None)])

	def testParsingOk(self):
		grammar = base.parseFromString(xmlgrammar.XMLGrammar, "<xmlGrammar/>")
		with TestFile("""
			<foo><bar ct="0">one</bar><bar ct="1" delete="T"/></foo>""") as f:
			recs = list(grammar.parse(f))
		del recs[0]["parser_"]
		self.assertEqual(recs, [{
			'foo': None,
			'foo/bar': 'one',
			'foo/bar/@ct': '0',
			'foo/bar[0]': None,
			'foo/bar[0]/@ct': '1',
			'foo/bar[0]/@delete': 'T'}])

	def testParsingError(self):
		grammar = base.parseFromString(xmlgrammar.XMLGrammar, "<xmlGrammar/>")
		with TestFile("""<foo/>
				</foo>""") as f:
			self.assertRaisesWithMsg(base.SourceParseError,
				# The repetition of the position in the message is somewhat
				# unfortunate, but I'm not sure how consistent position reporting
				# is in core lxml (and there's no way I can see to get at the
				# message only).  The file name is added to the message in errhandle.
				"At (2, 5): Extra content at the end of the document, line 2, column 5",
				list,
				(grammar.parse(f),))


class MapKeysTest(testhelpers.VerboseTest):
	def testNoDupeKey(self):
		self.assertRaisesWithMsg(base.StructureError,
			"At IO:'<mapKeys>x: foo, y: bar, z: foo </mapKeys>', (4, 0):"
			" foo clobbers an existing source within the key map.",
			base.parseFromString,
			(common.MapKeys, "<mapKeys>x: foo,\ny: bar,\nz: foo\n</mapKeys>"))

	def testBadLiteral(self):
		self.assertRaisesWithMsg(base.LiteralParseError,
			"At IO:'<mapKeys>henk</mapKeys>', (1, 13):"
			" 'henk' is not a valid value for mapKeys",
			base.parseFromString,
			(common.MapKeys, "<mapKeys>henk</mapKeys>"))


class ODBCGrammarTest(testhelpers.VerboseTest):
	def testNoQuery(self):
		self.assertRaisesWithMsg(base.StructureError,
			"At IO:'<odbcGrammar/>', (1, 14): Need to give at least one of"
			" query and makeQuery in an odbcGrammar.",
			base.parseFromString,
			(odbcgrammar.ODBCGrammar,
			"""<odbcGrammar/>"""))

	def testTooMuchQuery(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<odbcGrammar query="something"> <makeQuery><code>pass...\','
			' (2, 44): Cannot give both query and makeQuery in an odbcGrammar.',
			base.parseFromString,
			(odbcgrammar.ODBCGrammar,
			"""<odbcGrammar query="something">
				<makeQuery><code>pass</code></makeQuery></odbcGrammar>"""))

	def testBasic(self):
		g = base.parseFromString(odbcgrammar.ODBCGrammar,
			"""<odbcGrammar query="select * from tap_schema.tables"/>""")
		profile = base.getDBProfile("untrustedquery")
		try:
			with testhelpers.testFile("odbcsource.txt",
					"DRIVER={PostgreSQL UNICODE};SERVER=%s;"
					"DATABASE=%s;UID=%s;PWD={%s}"%(profile.host, profile.database,
						profile.user, profile.password)) as p:
				recs = list(g.parse(p))
		except ModuleNotFoundError:
			# no pyodbc installed.  Let's ignore this
			raise unittest.SkipTest("No pyodbc installed, not testing odbc grammar.")

		self.assertTrue(len(recs)>0)
		for row in recs:
			if row["schema_name"]=="tap_schema":
				break
		else:
			self.assertFalse("TAP_SCHEMA didn't come back from ODBC grammar?")

	def testWithLocalQuery(self):
		g = base.parseFromString(odbcgrammar.ODBCGrammar,
			"""<odbcGrammar>
				<makeQuery>
					<code>
						with base.getTableConn() as conn:
							table_name = next(conn.query(
								"select table_name from tap_schema.tables limit 1"
								))[0]
						self.hiddenTableName = table_name

						return ("SELECT table_name FROM tap_schema.tables"
							" WHERE table_name!={}").format(escapeSQL(table_name))
					</code>
				</makeQuery>
			</odbcGrammar>
			""")
		profile = base.getDBProfile("untrustedquery")
		try:
			with testhelpers.testFile("odbcsource.txt",
					"DRIVER={PostgreSQL UNICODE};SERVER=%s;"
					"DATABASE=%s;UID=%s;PWD={%s}"%(profile.host, profile.database,
						profile.user, profile.password)) as p:
				parsed = list(g.parse(p))
				embargoedName = parsed[0]["parser_"].hiddenTableName
				foundNames = [r["table_name"] for r in parsed]

				self.assertFalse(embargoedName in foundNames)
				self.assertTrue([n for n in foundNames if n.startswith("tap_schema")])

		except ModuleNotFoundError:
			# no pyodbc installed.  Let's ignore this
			raise unittest.SkipTest("No pyodbc installed, not testing odbc grammar.")


@skipIfNoH5py
class HDF5GrammarTest(testhelpers.VerboseTest):
	def testAstropyFlavour(self):
		src = os.path.join(base.getConfig("inputsDir"), "data", "hdex.hdf5")
		grammar = base.parseFromString(hdf5grammar.HDF5Grammar,
			"<hdf5Grammar dataset='testdata'/>")
		res = list(grammar.parse(src))
		for d in res:
			del d["parser_"]

		self.assertEqual(res, [{
			'apex': b'Position ICRS 42 -23.789',
			'field': 6.0217,
			'result_': 1}, {
			'apex': b'',
			'field': None,
			'result_': None}])

	def testBadDsName(self):
		src = os.path.join(base.getConfig("inputsDir"), "data", "hdex.hdf5")
		grammar = base.parseFromString(hdf5grammar.HDF5Grammar,
			"<hdf5Grammar dataset='gone' style='vaex'/>")
		self.assertRaisesWithMsg(base.ReportableError,
			utils.EqualingRE("Dataset gone not found in .*inputs/data/hdex.hdf5.  Note that we want the parent of the columns group here.  The following datasets are visible in the root: testdata, testdata.__table_column_meta__"),
			list,
			(grammar.parse(src),))
		
	def testVaexStyle(self):
		src = os.path.join(base.getConfig("inputsDir"), "data", "fromvaex.hdf5")
		grammar = base.parseFromString(hdf5grammar.HDF5Grammar,
			"<hdf5Grammar dataset='table' style='vaex'/>")
		res = list(grammar.parse(src))
		self.assertEqual(res[0]["source_id"], 4464711039265053952)
		self.assertAlmostEqual(res[1]["parallax"], 1.84249)


class _SampleUnionGrammar(testhelpers.TestResource):
	def make(self, _):
		return base.parseFromString(uniongrammar.UnionGrammar, r"""
			<unionGrammar>
				<handles pattern="\.txt$">
					<reGrammar names="a,b,c"/>
				</handles>
				<handles pattern="\.csv$">
					<csvGrammar/>
				</handles>
			</unionGrammar>""")


class UnionGrammarTest(testhelpers.VerboseTest):
	resources = [("g", _SampleUnionGrammar())]

	def testBadRESyntax(self):
		self.assertRaisesWithMsg(base.StructureError,
			"At IO:'<unionGrammar><handles pattern=\")\"><dictlistGrammar/>...', (1, 53): Bad handles pattern: ')' (unbalanced parenthesis at position 0)",
			base.parseFromString,
			(uniongrammar.UnionGrammar,
				'<unionGrammar><handles pattern=")"><dictlistGrammar/></handles>'
				'</unionGrammar>'))

	def testBasic(self):
		with testhelpers.testFile("nop.txt", "1 2 3\n") as name:
			self.assertEqual(
				getCleaned(self.g.parse(name)),
				[{'a': '1', 'b': '2', 'c': '3'}])

		with testhelpers.testFile("nop.csv", "x,y,z\n1,2,3\n") as name:
			self.assertEqual(
				getCleaned(self.g.parse(name)),
				[{'x': '1', 'y': '2', 'z': '3'}])

	def testNoMatch(self):
		self.assertRaisesWithMsg(base.DataError,
			"No handler grammar for '/knall/kopf'",
			self.g.parse,
			("/knall/kopf",))


if __name__=="__main__":
	testhelpers.main(HDF5GrammarTest)
