"""
Tests for active tags within RDs (and friends).
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo.helpers import testhelpers

from gavo import base
from gavo import rscdef
from gavo import rscdesc


class BasicTest(testhelpers.VerboseTest):
	def testStreamContent(self):
		ctx = base.ParseContext()
		_ = base.parseFromString(rscdef.TableDef, """<table id="bar">
			<STREAM id="foo"><table id="u" onDisk="True"><column name="x"/>
			</table></STREAM></table>""", context=ctx)
		parsedEvents = ctx.idmap["foo"].events_
		self.assertEqual(len(parsedEvents), 7)
		self.assertEqual(parsedEvents[0][1], "table")
		self.assertEqual(parsedEvents[4][:3],
			("value", "name", "x"))
		self.assertEqual(parsedEvents[-1][:2], ("end", "table"))
		self.assertEqual(str(parsedEvents[3][-1]),
			'IO:\'<table id="bar"> <STREAM id="foo"><table id="u" onDis...\', (2, 48)')

	def testBasicReplay(self):
		res = base.parseFromString(rscdef.DataDescriptor, """<data id="bar">
			<STREAM id="foo"><table id="u" onDisk="True"><column name="x"/>
			</table></STREAM><FEED source="foo"/></data>""")
		self.assertEqual(res.tables[0].id, "u")
		self.assertEqual(res.tables[0].columns[0].name, "x")

	def testPlainError(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<data id="bar"> <STREAM id="foo"><table id="u" onDisk...\', (3, 40) (replaying, real error position IO:\'<data id="bar"> <STREAM id="foo"><table id="u" onDisk...\', (2, 48)):'
			" table elements have no honk attributes or children.",
			base.parseFromString, (rscdef.DataDescriptor, """<data id="bar">
			<STREAM id="foo"><table id="u" onDisk="True"><honk name="x"/>
			</table></STREAM><FEED source="foo"/></data>"""))

	def testDocTag(self):
		ctx = base.ParseContext()
		_ = base.parseFromString(rscdef.DataDescriptor, """<data id="bar">
			<STREAM id="foo"><doc>A null table.</doc>
			<table id="u" onDisk="True"/></STREAM><FEED source="foo"/></data>""",
			context=ctx)
		self.assertEqual(ctx.idmap["foo"].doc, "A null table.")

	def testDocTagAtEnd(self):
		ctx = base.ParseContext()
		_ = base.parseFromString(rscdef.DataDescriptor, """<data id="bar">
			<STREAM id="foo">
			<table id="u" onDisk="True"/><doc>A null table.</doc></STREAM></data>""",
			context=ctx)
		self.assertEqual(ctx.idmap["foo"].doc, "A null table.")


class ReplayMacroTest(testhelpers.VerboseTest):
	def testBasic(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><table id="\\tabname" onDisk="True">
			<column name="x"/></table></STREAM>
			<FEED source="foo" tabname="abc"/></data>""")
		self.assertEqual(res.tables[0].id, "abc")

	def testStandardMacrosAvailable(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			r"""<data><STREAM id="foo"><table id="test">
			<column name="x\upper{\col}"/></table></STREAM>
			<FEED source="foo" col="abc"/></data>""")
		self.assertEqual(res.tables[0].columns[0].name, "xABC")

	def testHandover(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><table id="\\tabname" onDisk="True">
			<index columns="\\\\test"/>
			<column name="x"/></table></STREAM>
			<FEED source="foo" tabname="abc"/></data>""")
		self.assertEqual(res.tables[0].indexedColumns.popitem(),
			("test macro expansion", {'straight'}))

	def testMissingSource(self):
		self.assertRaisesWithMsg(base.StructureError,
			 'At IO:\'<data><STREAM id="foo"><table id="\\tabname" onDisk="T...\', (3, 24):'
			" Need exactly one of source and events on FEED elements",
			base.parseFromString, (rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><table id="\\tabname" onDisk="True">
			<column name="x"/></table></STREAM>
			<FEED tabname="abc"/></data>"""))

	def testMissingAttribute(self):
		try:
			_ = base.parseFromString(rscdef.DataDescriptor,
				"""<data><STREAM id="foo"><table id="\\tabname" onDisk="True">
				<column name="x"/></table></STREAM>
				<FEED source="foo" /></data>""")
		except base.MacroError as ex:
			self.assertEqual(ex.hint, "This probably means that you should"
				" have set a tabname attribute in the FEED tag.  For details"
				" see the documentation of the STREAM with id foo.")
			return
		self.fail("MacroError not raised")

	def testShortMacroName(self):
		self.assertRaisesWithMsg(
			base.StructureError,
			'At IO:\'<data><STREAM id="foo"><table id="\\t" onDisk="True"> ...\', (3, 4): DaCHS does not support one-character macro names.  Hence, you cannot use "t" as a FEED attribute.',
			base.parseFromString,
			(rscdef.DataDescriptor,
				"""<data><STREAM id="foo"><table id="\\t" onDisk="True">
				<column name="x"/></table></STREAM>
				<FEED source="foo" t="gockel"/></data>"""))

	def testDEFAULTSPlain(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><DEFAULTS tabname="fro"/>
			<table id="\\tabname" onDisk="True">
			<column name="x"/></table></STREAM>
			<FEED source="foo" /></data>""")
		self.assertEqual(res.tables[0].id, "fro")

	def testDEFAULTSElement(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><DEFAULTS><tabname>fro</tabname></DEFAULTS>
			<table id="\\tabname" onDisk="True">
			<column name="x"/></table></STREAM>
			<FEED source="foo" /></data>""")
		self.assertEqual(res.tables[0].id, "fro")

	def testDEFAULTSOverridden(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><DEFAULTS tabname="fro"/>
			<table id="\\tabname" onDisk="True">
			<column name="x"/></table></STREAM>
			<FEED source="foo" tabname="quux"/></data>""")
		self.assertEqual(res.tables[0].id, "quux")

	def testWithUnicodeValue(self):
		input = """<?xml version="1.0" encoding="iso-8859-1"?>
			<table id="gack"><STREAM id="foo">
			<column name="\\na" description="\\de\\+m"/></STREAM>
			<FEED source="foo" na="bla" de="foo \xb5"/></table>"""
		res = base.parseFromString(rscdef.TableDef, input)
		self.assertEqual(res.columns[0].description, "foo \u00b5m")

	def testWithUnicodeStream(self):
		input = """<?xml version="1.0" encoding="iso-8859-1"?>
			<table id="gack"><STREAM id="foo">
			<column name="\\na" description="\xb5m"/></STREAM>
			<FEED source="foo" na="bla"/></table>"""
		res = base.parseFromString(rscdef.TableDef, input)
		self.assertEqual(res.columns[0].description, "\u00b5m")

	def testExpandedArguments(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<data><STREAM id="foo"><table id="\\upper{\\tabname}"
				onDisk="True"/></STREAM>
			<FEED source="foo" tabname="abc"/></data>""")
		self.assertEqual(res.tables[0].id, "ABC")

	def testSourceExpanded(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			r"""<data>
				<STREAM id="s-one"><column name="fromone"/></STREAM>
				<STREAM id="s-two"><column name="fromtwo"/></STREAM>
				<NXSTREAM id="switch"><FEED source="s-\use"/></NXSTREAM>
				<table id="x">
					<FEED source="switch" use="one"/></table>
				<table id="y">
					<FEED source="switch" use="two"/></table></data>""")
		self.assertEqual(res.tables[0].id, "x")
		self.assertEqual(
			testhelpers.pickSingle(res.tables[0].columns).name,
			"fromone")
		self.assertEqual(
			testhelpers.pickSingle(res.tables[1].columns).name,
			"fromtwo")
	

class NestedTest(testhelpers.VerboseTest):
	def testDoubleNest(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><STREAM id="cols">
					<column name="from2"/>
					<index columns="\\curtable"/></STREAM>
				<STREAM id="foo">
					<table id="\tabname" onDisk="True">
					<FEED source="cols"/>
					<column name="from1"/></table></STREAM>
				<FEED source="foo" tabname="abc"/></resource>""")
		td = res.tables[0]
		self.assertEqual(td.id, "abc")
		self.assertEqual(", ".join(c.name for c in td), "from2, from1")
		self.assertEqual(res.tables[0].indexedColumns.popitem(),
			("test.abc", {'straight'}))

	def testInnermostExpansion(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><STREAM id="cols">
					<column name="\innercol"/>
					<index columns="\\curtable"/></STREAM>
				<STREAM id="foo">
					<table id="\tabname" onDisk="True">
					<FEED source="cols" innercol="whoppa"/>
					<column name="from1"/></table></STREAM>
				<FEED source="foo" tabname="abc"/></resource>""")
		td = res.tables[0]
		self.assertEqual(td.id, "abc")
		self.assertEqual(", ".join(c.name for c in td), "whoppa, from1")
		self.assertEqual(res.tables[0].indexedColumns.popitem(),
			("test.abc", {'straight'}))

	def testLoopReplay(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><STREAM id="cols">
					<LOOP listItems="x y">
					<events><column name="\\mode\\+_\item"/></events></LOOP></STREAM>
				<table id="foo">
					<FEED source="cols" mode="a" reexpand="True"/>
					<FEED source="cols" mode="b" reexpand="True"/>
				</table></resource>""")
		self.assertEqual([c.name for c in res.tables[0]],
			['a_x', 'a_y', 'b_x', 'b_y'])

	def testLoopExpansion(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><NXSTREAM id="cols">
					<LOOP listItems="\stuff">
					<events><column name="\\item"/></events></LOOP></NXSTREAM>
					<table id="foo"><FEED source="cols" stuff="x y"/></table>
				</resource>""")
		self.assertEqual([c.name for c in res.tables[0]],
			['x', 'y'])

	def testFeedInLoop(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test">
					<STREAM id="cols">
						<column name="\foo"/>
						<column name="err_\foo"/>
					</STREAM>
					<STREAM id="coords">
						<LOOP listItems="x y">
							<events passivate="True">
								<FEED source="cols" foo="\\mode\\+\item" reexpand="True"/>
							</events>
						</LOOP>
					</STREAM>
					<STREAM id="modes">
						<FEED source="coords" mode="a" reexpand="True"/>
						<FEED source="coords" mode="b" reexpand="True"/>
					</STREAM>
					<table id="abc">
						<FEED source="modes"/>
					</table>
				</resource>""")
		self.assertEqual([c.name for c in res.tables[0]],
			['ax', 'err_ax', 'ay', 'err_ay', 'bx', 'err_bx', 'by', 'err_by'])


class PruneTest(testhelpers.VerboseTest):
	def testWithName(self):
		td = base.parseFromString(rscdef.TableDef, """
			<table>
			<STREAM id="foo">
				<column name="abta"/>
				<column name="gub"/>
			</STREAM>
			<FEED source="foo">
				<PRUNE name="abta"/>
			</FEED>
			</table>""")
		self.assertEqual(" ".join(c.name for c in td), "gub")
	
	def testWithUCD(self):
		td = base.parseFromString(rscdef.TableDef, """
			<table id="foo">
				<FEED>
					<events>
						<column name="abta" ucd="clin.dead"/>
						<column name="gub"/>
					</events>
					<PRUNE ucd="clin.dead"/>
				</FEED>
			</table>""")
		self.assertEqual(" ".join(c.name for c in td), "gub")

	def testIsConjunction(self):
		td = base.parseFromString(rscdef.TableDef, """
			<table id="foo">
				<FEED>
					<events>
						<column name="abta" ucd="clin.dead"/>
						<column name="gub"/>
					</events>
					<PRUNE ucd="clin.dead" name="gub"/>
				</FEED>
			</table>""")
		self.assertEqual(" ".join(c.name for c in td), "abta gub")

	def testMultiprune(self):
		td = base.parseFromString(rscdef.TableDef, """
			<table id="foo">
				<FEED>
					<events>
						<column name="abta" ucd="clin.dead"/>
						<column name="abto"/>
					</events>
					<PRUNE name="ab.*"/>
				</FEED>
				<column name="neuro"/>
			</table>""")
		self.assertEqual(" ".join(c.name for c in td), "neuro")

	def testDeepPrune(self):
		dd = base.parseFromString(rscdef.DataDescriptor, """
			<data>
				<table id="foo">
					<FEED>
						<events>
							<column name="abta" ucd="clin.dead">
								<values><option>a</option><option>b</option></values>
							</column>
							<column name="abto"/>
							<column name="neuro"/>
						</events>
						<PRUNE name="ab.*"/>
					</FEED>
				</table>
			</data>""")
		self.assertEqual(" ".join(c.name for c in dd.tables[0]), "neuro")

	def testRecursivePune(self):
		dd = base.parseFromString(rscdef.DataDescriptor, """
			<data>
				<STREAM id="srcev">
					<column name="a"/>
					<column name="b"/>
				</STREAM>
				<STREAM id="pruned">
					<FEED source="srcev">
						<PRUNE name="a"/>
					</FEED>
				</STREAM>
				<table id="foo">
					<FEED source="pruned"/>
				</table>
			</data>""")
		self.assertEqual(" ".join(c.name for c in dd.tables[0]), "b")

	def testRecursivePuneWithId(self):
		dd = base.parseFromString(rscdef.DataDescriptor, """
			<data>
				<STREAM id="srcev">
					<column name="a"/>
					<column name="b" id="kill"/>
				</STREAM>
				<STREAM id="pruned">
					<FEED source="srcev">
						<PRUNE id="kill"/>
					</FEED>
				</STREAM>
				<table id="foo">
					<FEED source="pruned"/>
				</table>
			</data>""")
		self.assertEqual(" ".join(c.name for c in dd.tables[0]), "a")

	def testRecursivePuneLate(self):
		dd = base.parseFromString(rscdef.DataDescriptor, """
			<data>
				<STREAM id="srcev">
					<column name="a"/>
					<column name="b" id="kill"/>
				</STREAM>
				<STREAM id="pruned">
					<FEED source="srcev"/>
					<column name="c"/>
				</STREAM>
				<table id="foo">
					<FEED source="pruned">
						<PRUNE id="kill"/>
					</FEED>
				</table>
			</data>""")
		self.assertEqual(" ".join(c.name for c in dd.tables[0]), "a c")


class EditTest(testhelpers.VerboseTest):
	def testProd(self):
		res = base.parseFromString(rscdef.TableDef,
				"""<table><FEED source="//products#tablecols">
					<EDIT ref="column[accref]" utype="ssa:Access.Reference">
						<values default="notfound.fits"/></EDIT></FEED>
					</table>""")
		self.assertEqual(res.columns[0].utype, "ssa:Access.Reference")
		self.assertEqual(res.columns[0].values.default, "notfound.fits")

	def testInBetween(self):
		res = base.parseFromString(rscdef.DataDescriptor,
				"""<data><STREAM id="foo"><table id="bla" onDisk="True">
				<column name="x"/><column name="y"/></table></STREAM>
				<FEED source="foo"><EDIT ref="column[x]" type="text"/></FEED></data>""")
		td = res.tables[0]
		self.assertEqual(", ".join(c.type for c in td), "text, real")

	def testDoubleEdit(self):
		res = base.parseFromString(rscdef.DataDescriptor,
				"""<data>
				<STREAM id="inc"><column name="grok"/></STREAM>
				<STREAM id="foo"><table id="bla" onDisk="True">
				<column name="x"/><column name="y"/>
				<FEED source="inc">
					<EDIT ref="column[grok]" type="spoint"/></FEED></table></STREAM>
				<FEED source="foo"><EDIT ref="column[x]" type="text"/></FEED></data>""")
		td = res.tables[0]
		self.assertEqual(", ".join(c.type for c in td), "text, real, spoint")

	def testRecursiveEdit(self):
		res = base.parseFromString(rscdef.DataDescriptor,
				"""<data>
				<STREAM id="stage0"><column name="grok"/><column name="nok"/></STREAM>
				<STREAM id="stage1"><FEED source="stage0">
					<EDIT ref="column[grok]" type="text"/></FEED></STREAM>
				<STREAM id="stage2"><FEED source="stage1"/></STREAM>
				<table><FEED source="stage2"/></table></data>""")
		td = res.tables[0]
		self.assertEqual(", ".join(c.type for c in td), "text, real")
	
	def testRecursiveDoubleEdit(self):
		res = base.parseFromString(rscdef.DataDescriptor,
				"""<data>
				<STREAM id="stage0"><column name="grok"/><column name="nok"/></STREAM>
				<STREAM id="stage1"><FEED source="stage0">
					<EDIT ref="column[nok]" type="text"/></FEED></STREAM>
				<STREAM id="stage2"><FEED source="stage1">
					<EDIT ref="column[nok]" type="char"/></FEED></STREAM>
				<STREAM id="stage3"><FEED source="stage2"/></STREAM>
				<table><FEED source="stage3"/></table></data>""")
		td = res.tables[0]
		self.assertEqual(", ".join(c.type for c in td), "real, char")
	
	def testRemoteEdit(self):
		res = base.parseFromString(rscdef.DataDescriptor,
				"""<data>
				<STREAM id="stage0"><column name="grok"/><column name="nok"/></STREAM>
				<STREAM id="stage1"><FEED source="stage0"/></STREAM>
				<STREAM id="stage2"><FEED source="stage1">
					<EDIT ref="column[grok]" type="text"/></FEED></STREAM>
				<table><FEED source="stage2"/></table></data>""")
		td = res.tables[0]
		self.assertEqual(", ".join(c.type for c in td), "text, real")


class LoopTest(testhelpers.VerboseTest):
	def testBasic(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			"""<?xml version="1.0" encoding="iso-8859-1"?><data><STREAM id="foo">
			<column name="c_\\name" description="\\count, \\desc"
				verbLevel="\\count"/>
			</STREAM>
			<table id="gook">
			<LOOP source="foo"><csvItems>
				name,desc,count
				anInt,m\xf6rkel,2
				aString,sth,3
				</csvItems>
				</LOOP></table></data>""")
		cols = list(res.tables[0])
		self.assertEqual(len(cols), 2)
		self.assertEqual(cols[0].name, "c_anInt")
		self.assertEqual(cols[0].description, "2, m\u00f6rkel")
		self.assertEqual(cols[1].verbLevel, 3)

	def testEmbedded(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			r"""<data>
			<table id="gook">
			<LOOP><csvItems>
				band,desc
				B,Johnson B
				C,Kernighan C
				d,"cumbersome, outdated band d"
				</csvItems>
				<events>
					<column name="mag\band" tablehead="m_\band"
						description="Magnitude in \desc"/>
					<column name="e_mag\band" tablehead="Err. m_\band"
						description="Error in \desc magnitude."/>
				</events>
				</LOOP></table></data>""")
		cols = list(res.tables[0])
		self.assertEqual(len(cols), 6)
		self.assertEqual(cols[-1].description,
			"Error in cumbersome, outdated band d magnitude.")

	def testNoTwoRowSources(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<data> <table id="gook"> <LOOP listItems="a b"><csvIt...\', (5, 4):'
			" Must give exactly one data source in LOOP",
			base.parseFromString, (rscdef.DataDescriptor,
			r"""<data>
			<table id="gook">
			<LOOP listItems="a b"><csvItems>band,desc</csvItems>
				<events><column name="mag\band"/></events>
				</LOOP></table></data>"""))
	
	def testListItems(self):
		res = base.parseFromString(rscdef.DataDescriptor,
			r"""<data>
			<table id="gook">
			<LOOP listItems="a b c">
				<events>
					<column name="orig_\item"/>
				</events>
				</LOOP></table></data>""")
		cols = list(res.tables[0])
		self.assertEqual(len(cols), 3)
		self.assertEqual(cols[-1].name, "orig_c")

	def testCodeItems(self):
		ctx = base.ParseContext()
		base.parseFromString(rscdef.DataDescriptor,
			r"""<data>
				<table id="nok"><column name="a"/><column name="b"/></table>
				<table id="cop"><LOOP><codeItems>
					for item in context.getById("nok"):
						yield {"copName": item.name+"_copy"}</codeItems>
					<events>
						<column name="\copName"/>
					</events></LOOP></table></data>""", context=ctx)
		cols = list(ctx.getById("cop"))
		self.assertEqual(",".join(c.name for c in cols), "a_copy,b_copy")

	def testAdditionalMacros(self):
		table = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
			<table id="o">
				<column name="a"/><column name="b"/>
			</table>
			<table id="c">
				<LOOP listItems="a b" tabname="o">
					<events>
						<column original="\tabname.\item"/>
					</events>
				</LOOP>
			</table></resource>""").tables[1]
		self.assertEqual(table.id, "c")
		self.assertEqual(table.columns[0].name, "a")
		self.assertEqual(table.columns[1].name, "b")

	def testPassivate(self):
		table = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
			<STREAM id="col1">
				<column name="foo"/>
			</STREAM>
			<STREAM id="col2">
				<column name="bar"/>
			</STREAM>
			<STREAM id="cols">
				<LOOP listItems="1 2">
					<events passivate="True">
						<FEED source="col\item"/>
					</events>
				</LOOP>
			</STREAM>

			<table id="o">
				<FEED source="cols"/>
			</table>
			</resource>""").tables[0]
		self.assertEqual(table.columns[0].name, "foo")
		self.assertEqual(table.columns[1].name, "bar")

	def testGeneratedSTREAMs(self):
		rd = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
				<LOOP listItems="1 2">
					<events passivate="True">
						<STREAM id="\item\+stream">
							<table id="tab\item"><column name="colin\item"/></table>
						</STREAM>
					</events>
				</LOOP>
				<FEED source="1stream"/>
				<FEED source="2stream"/>
			</resource>""")
		self.assertEqual(rd.getById("tab1").columns[0].name, "colin1")
		self.assertEqual(rd.getById("tab2").columns[0].name, "colin2")

	def testNestedExpansion(self):
		rd = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
				<LOOP listItems="1 2" reexpand="True">
					<events>
						<LOOP listItems="a b">
							<events>
								<meta name="gob\item\\+\\item">present</meta>
							</events>
						</LOOP>
					</events>
				</LOOP>
			</resource>""")
		self.assertEqual(set(rd.meta_.keys()),
			set(['gobb1', 'goba1', 'gobb2', 'goba2']))

	def testStreamReexpansion(self):
		rd = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
			<STREAM id="plusminus">
				<column name="\prefix\+\basecol\+_error_m"/>
				<column name="\prefix\+\basecol\+_error_p"/>
			</STREAM>
			<table id="t">
				<LOOP listItems="l1_ l2_">
					<events passivate="True">
						<FEED source="plusminus" basecol="dist" prefix="\item"
							reexpand="True"/>
					</events>
				</LOOP>
			</table>
			</resource>""")
		self.assertEqual(
			[c.name for c in rd.getById("t").columns], [
				"l1_dist_error_m", "l1_dist_error_p",
				"l2_dist_error_m", "l2_dist_error_p"])

	def testBrokenCodeItemMeta(self):
		self.assertRaisesWithMsg(
			base.BadCode,
			"At IO:'<resource schema=\"data\"> <LOOP><codeItems>return = 3<...', (2, 31): Bad source code in function (invalid syntax (<LOOP at <internally built>>, line 2))",
			base.parseFromString,
			(rscdesc.RD,
			r"""<resource schema="data">
				<LOOP><codeItems>return = 3</codeItems>
					<events/></LOOP></resource>"""))


class RDBasedTest(testhelpers.VerboseTest):
	def setUp(self):
		self.rd = testhelpers.getTestRD("activetest")
	
	def testCSVMacrosExpandedInTable(self):
		cols = list(self.rd.getById("mags"))
		self.assertEqual(len(cols), 6)
		self.assertEqual(cols[0].name, "jmag")
		self.assertEqual(cols[-1].description, "Error in magnitude in the K band")


class MixinTest(testhelpers.VerboseTest):
	baseRDLit = r"""<resource schema="test">
		<mixinDef id="bla">
			<mixinPar key="xy" alias="ov_default">xy</mixinPar>
			<mixinPar key="nd" alias="nd_new"/>
			<events>
				<param name="\xy" type="text">\nd</param>
			</events>
		</mixinDef>
		%s
	</resource>"""

	def testWorkingMacro(self):
		res = base.parseFromString(rscdesc.RD,
			self.baseRDLit%'<table><mixin nd="uu">bla</mixin></table>')
		self.assertEqual(res.tables[0].params[0].name, "xy")
		self.assertEqual(res.tables[0].params[0].value, "uu")

	def testWorkingMacroElement(self):
		res = base.parseFromString(rscdesc.RD,
			self.baseRDLit%'<table><mixin><nd>"uu"</nd>bla</mixin></table>')
		self.assertEqual(res.tables[0].params[0].name, "xy")
		self.assertEqual(res.tables[0].params[0].value, '"uu"')

	def testOverridingMacro(self):
		res = base.parseFromString(rscdesc.RD,
			self.baseRDLit%'<table><mixin xy="zq" nd="uu">bla</mixin></table>')
		self.assertEqual(res.tables[0].params[0].name, "zq")
		self.assertEqual(res.tables[0].params[0].value, "uu")

	def testNotFilledMacro(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="test"> <mixinDef id="bla"> <mixinPa...\', (9, 27):'
			" Mixin parameter nd mandatory",
			base.parseFromString,
			(rscdesc.RD,
			self.baseRDLit%'<table><mixin xy="zq">bla</mixin></table>'))

	def testBadFillingRaises(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="test"> <mixinDef id="bla"> <mixinPa...\', (9, 20):'
			" nd elements cannot have a children in mixins.",
			base.parseFromString,
			(rscdesc.RD,
			self.baseRDLit%'<table><mixin><nd><a>uu</a></nd>bla</mixin></table>'))
	
	def testUnknownMacroRaises(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="test"> <mixinDef id="bla"> <mixinPa...\', (9, 35):'
			' The attribute(s) a is/are not allowed on this mixin',
			base.parseFromString,
			(rscdesc.RD,
			self.baseRDLit%'<table><mixin nd="u"><a>uu</a>bla</mixin></table>'))

	def testBadMacroNamesRejected(self):
		self.assertRaises(base.StructureError,
			base.parseFromString, rscdesc.RD,
			r"""<resource schema="test"><mixinDef id="bla">
				<mixinPar key="a">__NULL__</mixinPar></mixinDef></resource>""")

	def testNULLDefault(self):
			res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><mixinDef id="bla">
				<mixinPar key="aa">__NULL__</mixinPar><events>
				<param name="u">\aa</param></events></mixinDef>
				<table mixin="bla"/></resource>""")
			self.assertEqual(res.tables[0].params[0].value, None)

	def testEmptyDefault(self):
			res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test"><mixinDef id="bla">
				<mixinPar key="aa">__EMPTY__</mixinPar><events>
				<param name="u" unit=""/></events></mixinDef>
				<table mixin="bla"/></resource>""")
			self.assertEqual(res.tables[0].params[0].unit, "")

	def testNestedFeeds(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="test">
			<STREAM id="field">
				<param name="u">\uVal</param>
			</STREAM>
			<mixinDef id="bla">
				<mixinPar key="uVal"/>
				<events>
					<LFEED source="field"/>
				</events>
			</mixinDef>
			<table><mixin uVal="5">bla</mixin></table>
			</resource>""")
		self.assertEqual(res.tables[0].params[0].value, 5.)

	def testReplayedRD(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
				<STREAM id="_part">
					<events>
						<column name="urks"/>
					</events>
				</STREAM>
				<mixinDef id="bla">
					<LFEED source="_part"/>
				</mixinDef>
				<table mixin="bla"/></resource>""")
		self.assertEqual(res.tables[0].columns[0].name, "urks")

	def testMixinParsInActiveTags(self):
		res = base.parseFromString(rscdesc.RD,
			r"""<resource schema="data">
				<table id="src">
					<column name="a"/><column name="b"/>
				</table>
				<mixinDef id="bla">
					<mixinPar name="srctable"/>
					<events>
						<LOOP listItems="a b">
							<events>
								<column original="\\srctable.\item"/>
							</events>
						</LOOP>
					</events>
				</mixinDef>
				<table id="dest"><mixin srctable="src">bla</mixin></table>
				</resource>""")
		self.assertEqual(res.tables[1].id, "dest")
		self.assertEqual(res.tables[1].columns[0].name, "a")
		self.assertEqual(res.tables[1].columns[1].name, "b")

	def testMaterialOverrideable(self):
		res = base.parseFromString(rscdesc.RD,
			"""<resource schema="data">
				<table id="bla" mixin="//siap#pgs" onDisk="True" temporary="True">
					<column name="dateObs" description="Don't believe this."/>
				</table></resource>""")
		self.assertEqual(
			res.getById("bla").getColumnByName("dateObs").description,
			"Don't believe this.")

	def testLateEvents(self):
		res = base.parseFromString(rscdesc.RD,
			"""<resource schema="data">
				<table id="bla" mixin="//scs#q3cindex" onDisk="True" temporary="True">
					<column name="r" ucd="pos.eq.ra;meta.main"/>
					<column name="d" ucd="pos.eq.dec;meta.main"/>
				</table></resource>""")
		self.assertTrue("q3c_ang2ipix(r, d)" in
			"".join(res.getById("bla").indices[0].iterCode()))

	def testParameterOverwriting(self):
		rd = base.parseFromString(rscdesc.RD, r"""<resource schema="test">
			<mixinDef id="bla">
				<mixinPar key="xy">xy</mixinPar>
				<mixinPar key="xy">zz</mixinPar>
				<events>
					<param name="val" type="text">\xy</param>
				</events>
			</mixinDef>
			<table id="foo" mixin="bla"/>
		</resource>""")
		self.assertEqual(len(rd.getById("bla").pars), 1)
		self.assertEqual(rd.getById("foo").getParamByName("val").value, "zz")

	def testBasicAliasing(self):
		res = base.parseFromString(rscdesc.RD,
			self.baseRDLit%'<table><mixin nd_new="hey">bla</mixin></table>')
		self.assertEqual(res.tables[0].params[0].name, "xy")
		self.assertEqual(res.tables[0].params[0].value, "hey")

	def testAliasingNoDefault(self):
		res = base.parseFromString(rscdesc.RD,
			self.baseRDLit%'<table><mixin ov_default="w"'
				' nd="hey">bla</mixin></table>')
		self.assertEqual(res.tables[0].params[0].name, "w")
		self.assertEqual(res.tables[0].params[0].value, "hey")

	def testAliasingBothGiven(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="test"> <mixinDef id="bla"> <mixinPa...\', (9, 50):'
			' Both canonical name and alias bound: xy, ov_default',
			base.parseFromString,
			(rscdesc.RD,
			self.baseRDLit%'<table><mixin xy="g" ov_default="w"'
				' nd="hey">bla</mixin></table>'))
	

class ReferenceAttributeTest(testhelpers.VerboseTest):
	def testRefattChangeHonored(self):
		rd = base.parseFromString(rscdesc.RD, """
			<resource schema="test">
				<STREAM id="woo"><var key="src">@inp+33</var></STREAM>
				<STREAM id="loo"><var key="gno">4</var></STREAM>
				<table id="t1"><column name="foo" type="integer" required="True"/>
				</table>
				<data id="make_t1"><dictlistGrammar/>
					<make table="t1">
						<rowmaker>
							<FEED source="woo"/>
							<FEED source="loo"/>
							<map key="foo">@src+1-@gno</map></rowmaker></make></data>
			</resource>
			""")
		from gavo import rsc
		d = rsc.makeData(rd.getById("make_t1"), forceSource=[{'inp': 1}])
		self.assertEqual(d.getPrimaryTable().rows, [{'foo': 31}])


if __name__=="__main__":
	testhelpers.main(MixinTest)
