"""
Tests pertaining to the data parsing system
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import os
import shutil
import tarfile
import tempfile
import time
from io import BytesIO

from gavo.helpers import testhelpers

from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import rscdesc
from gavo import utils
from gavo.rsc import dumping

import tresc


class RowValidationTest(testhelpers.VerboseTest):
	def testOptional(self):
		rd = testhelpers.getTestRD()
		recDef = rd.getTableDefById("valSpec")
		rec = {}
		try:
			recDef.validateRow(rec)
		except base.ValidationError as ex:
			self.assertEqual(ex.colName, "a_num")
		rec["enum"] = "abysimal"
		self.assertRaises(base.ValidationError, recDef.validateRow,
			rec)
		rec["a_num"] = 14
		self.assertTrue(recDef.validateRow(rec)==None)
	
	def testBadSchema(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="a;drop table foo"/>\', (1, 37):'
			' DaCHS schema attributes must be valid python identifiers',
			base.parseFromString,
			(rscdesc.RD, '<resource schema="a;drop table foo"/>'))


@contextlib.contextmanager
def _inputFile(fName, content):
	with open(fName, "w") as f:
		f.write(content)
	try:
		yield
	finally:
		os.unlink(fName)


class SimpleParseTest(testhelpers.VerboseTest):
	"""tests for some simple parses.
	"""
	def _getDD(self):
		return base.parseFromString(rscdef.DataDescriptor, '<data>'
			'<sources>testInput.txt</sources>'
			'<columnGrammar><col key="val1">3</col>'
			'<col key="val2">6-10</col></columnGrammar>'
			'<table id="foo"><column name="x" type="integer"/>'
			'<column name="y" type="text"/>'
			'</table><rowmaker id="bla_foo">'
			'<map dest="y" src="val2"/>'
			'<map dest="x" src="val1"/>'
			'</rowmaker><make table="foo" rowmaker="bla_foo"/></data>')

	def testBasic(self):
		with _inputFile("testInput.txt", "xx1xxabc, xxxx\n"):
			dd = self._getDD()
			data = rsc.makeData(dd)
			self.assertEqual(data.getPrimaryTable().rows, [{'y': 'abc,', 'x': 1}])
	
	def testRaising(self):
		with _inputFile("testInput.txt", "xxxxxabc, xxxx\n"):
			dd = self._getDD()
			self.assertRaisesWithMsg(base.ValidationError,
				"Field x: While building x in bla_foo: invalid literal for int()"
					" with base 10: 'x'",
				rsc.makeData, (dd,))
	
	def testValidation(self):
		with _inputFile("testInput.txt", "xx1xxabc, xxxx\n"):
			dd = base.parseFromString(rscdef.DataDescriptor, '<data>'
				'<sources pattern="testInput.txt"/>'
				'<columnGrammar><col key="val1">3</col></columnGrammar>'
				'<table id="foo"><column name="x" type="integer"/>'
				'<column name="y" type="text"/></table><rowmaker id="bla_foo">'
				'<map dest="x" src="val1"/></rowmaker>'
				'<make table="foo" rowmaker="bla_foo"/></data>')
			rsc.makeData(dd, rsc.parseNonValidating)
			self.assertRaisesWithMsg(base.ValidationError,
				"Field y: y not bound in row",
				rsc.makeData, (dd, rsc.parseValidating))

	def testNonExistingSource(self):
		dd = self._getDD()
		try:
			rsc.makeData(dd)
		except base.SourceParseError as ex:
			msg = str(ex)
			self.assertTrue(msg.startswith(
				"At start: I/O operation failed ([Errno 2] No such file or directory:"))
			self.assertTrue(msg.endswith("tests/testInput.txt')"))


class RowsetTest(testhelpers.VerboseTest):
	def assertQueryReturns(self, query, expected):
		cursor = self.conn.cursor()
		cursor.execute(query)
		found = list(cursor)
		self.assertEqual(len(found), len(expected),
			"Rowset length didn't match: %s"%str(found))
		for f, e in zip(sorted(found), sorted(expected)):
			self.assertEqual(f, e, "Rows don't match: %s vs. %s"%(f, e))


class TestProductsImport(RowsetTest):
	"""tests for operational import of real data.

	This is more of an integration test, but never mind that.
	"""
	resources = [("conn", tresc.prodtestTable)]

	def testWorkingImport(self):
		self.assertQueryReturns("select object from test.prodtest",
			[("gabriel",), ("michael",)])
	
	def testInProducts(self):
		self.assertQueryReturns(
				"select * from dc.products where sourceTable='test.prodtest'",
			[('data/a.imp', 'X_test', datetime.date(2030, 12, 31),
					'text/plain', 'data/a.imp', 'test.prodtest',
					'data/broken.imp', None, 'text/plain'),
			 ('data/b.imp', 'X_test', datetime.date(2003, 12, 31),
					'text/plain', 'data/b.imp', 'test.prodtest',
					'http://example.com/borken.jpg', None, 'image/jpeg'),])

	def testNoMixinInMem(self):
		self.assertRaisesWithMsg(base.StructureError,
			"Tables mixing in product must be onDisk, but foo is not",
			base.parseFromString, (rscdesc.RD,
				'<resource schema="test"><table id="foo" mixin="//products#table">'
					'</table></resource>',))
	
	def testImportWithBadFails(self):
		dd = testhelpers.getTestRD().getById("productimport")
		self.assertRaisesWithMsg(base.SourceParseError,
			"At unspecified location: Not a key value pair: 'kaputt.'",
			rsc.makeData,
			(dd,),
			parseOptions=rsc.parseValidating, connection=self.conn)


class ProductsSkippingTest(RowsetTest):
	resources = [("conn", tresc.dbConnection)]

	def _testImportWithSkipBehavesConsistently(self, parseOptions):
# This is of course crappy behaviour we don't want, but it's hard
# to fix this, and actually sometimes this "skip just one table"
# might be what people want.  Anyway, we need to be consistent.
		dd = testhelpers.getTestRD().getById("productimport-skip")
		data = rsc.makeData(dd,
			parseOptions=parseOptions, connection=self.conn)
		try:
			self.assertQueryReturns("select object from test.prodskip",
				[('gabriel',)])
			self.assertQueryReturns("select accref from dc.products where"
				" sourceTable='test.prodskip'",
				[('data/a.imp',), ('data/b.imp',)])
		finally:
			data.dropTables(parseOptions)
			self.conn.commit()

	def testImportWithSkipBehavesKeepGoing(self):
		self._testImportWithSkipBehavesConsistently(
			rsc.parseValidating.change(keepGoing=True))

	def testImportWithSkipBehavesNormal(self):
		self._testImportWithSkipBehavesConsistently(
			rsc.parseValidating.change(keepGoing=False))


class IgnoredThingsTest(testhelpers.VerboseTest):
	rd = base.parseFromString(rscdesc.RD,
		"""<resource schema="test">
			<table id="coll">
				<column name="item" type="integer"/>
			</table>
			<data id="import">
				<sources/> <!-- provided by text -->
				<embeddedGrammar>
					<iterator>
						<code>
							lower, upper = map(int, self.sourceToken.split())
							for i in range(lower, upper):
								yield {"item": i}
						</code></iterator>
					<rowfilter>
						<code>
							if @item==223:
								raise SkipThis("Killing this source")
							yield row
						</code>
					</rowfilter>
				</embeddedGrammar>
				<make table="coll">
					<rowmaker idmaps="*">
						<apply>
							<code>
								if @item==23:
									raise IgnoreThisRow("Don't like 23")
								elif @item==123:
									raise SkipThis("That's quite enough")
							</code></apply></rowmaker></make>
				</data>
		</resource>""")
	
	def testIgnoreThisRow(self):
		d = rsc.makeData(self.rd.getById("import"), forceSource="22 25")
		res = [r["item"] for r in d.getPrimaryTable().rows]
		self.assertEqual(res, [22, 24])

	def testSkipThis(self):
		d = rsc.makeData(self.rd.getById("import"), forceSource="122 125")
		res = [r["item"] for r in d.getPrimaryTable().rows]
		self.assertEqual(res, [122])

	def testSkipThisInRowfilter(self):
		d = rsc.makeData(self.rd.getById("import"), forceSource="222 225")
		res = [r["item"] for r in d.getPrimaryTable().rows]
		self.assertEqual(res, [222])


class _WildResource(testhelpers.TestResource):
	def make(self, ignored):
		tempPath = tempfile.mkdtemp(suffix="parsetest",
			dir=str(base.getConfig("inputsdir")))
		rdSrc = """<resource schema="%s">
			<table onDisk="true" id="deleteme" mixin="//products#table"/>
			<data id="import">
				<sources pattern="*"/>
				<keyValueGrammar>
					<rowfilter procDef="//products#define">
						<bind key="table">"deleteme"</bind>
					</rowfilter>
				</keyValueGrammar>
				<make table="deleteme"/>
			</data>
		</resource>"""%os.path.basename(tempPath)
		rd = base.parseFromString(rscdesc.RD, rdSrc)
		return rd
	
	def clean(self, rd):
		shutil.rmtree(rd.resdir)


class ProductsBadNameTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
# Products are supposed to have well-behaved file names.
# This is a test making sure bad file names are rejected.

	resources = [("rd", _WildResource())]

	def _runTest(self, sample):
		destName, shouldBeOk = sample
		fullPath = os.path.join(base.getConfig("inputsDir"), destName)
		f = open(fullPath, "w")
		try:
			ex = None
			try:
				list(self.rd.dds[0].grammar.parse(fullPath, None))
			except ValueError as raised:
				ex = raised
			if shouldBeOk and ex:
				raise AssertionError("Filename %s should be legal but is not"%destName)
			elif not shouldBeOk and ex is None:
				raise AssertionError("Filename %s should be illegal"
					" but is not"%destName)
		finally:
			f.close()
			os.unlink(fullPath)
	
	samples = [
		("ok_LOT,allowed-this_ought,to10000do.file", True),
		("Q2232+23.fits", False),
		("A&A23..1.fits", False),
		("name with blank.fits", False),
		("name%with%blank.fits", False),
		("don't want quotes", False),]


class CleanedupTest(RowsetTest):
	"""tests for cleanup after table drop (may fail if other tests failed).
	"""
	resources = [("conn", tresc.dbConnection)]

	def testNotInProducts(self):
		# WARNING: test.prodtest is used by lots of other test.  So,
		# this thing may not run in parallel to other tests, and
		# it *must* recreate productsimport when done.
		dd = testhelpers.getTestRD().getById("productimport")
		try:
			tresc.makeProdtestTable(self.conn)
			rsc.Data.drop(dd,
				connection=self.conn)
			self.assertQueryReturns(
				"select * from dc.products where sourceTable='test.prodtest'",
				[])
			self.assertQueryReturns(
				"select * from dc.tablemeta where tableName='test.prodtest'",
				[])
		finally:
			self.conn.rollback()
			tresc.makeProdtestTable(self.conn)
			self.conn.commit()


class DispatchedGrammarTest(testhelpers.VerboseTest):
	def testSimple(self):
		rd = base.parseFromString(rscdesc.RD,
			"""
			<resource schema="test">
				<table id="t1"><column name="x" type="text"/></table>
				<table id="t2"><column name="y" type="text"/></table>
				<data id="import">
					<sources items="foo"/>
					<embeddedGrammar isDispatching="True"><iterator><code>
						yield "one", {"x": "x1", "y": "FAIL"}
						yield "two", {"x": "FAIL", "y": "y1"}
						yield "one", {"x": "x2", "y": "FAIL"}
					</code></iterator></embeddedGrammar>
					<make role="one" table="t1"/>
					<make role="two" table="t2"/>
				</data>
			</resource>
			""")
		data = rsc.makeData(rd.getById("import"))
		self.assertEqual([r["x"] for r in data.getTableWithRole("one").rows],
			["x1", "x2"])
		self.assertEqual([r["y"] for r in data.getTableWithRole("two").rows],
			["y1"])

	def testNoRole(self):
		dd = base.parseFromString(rscdef.DataDescriptor, """<data id="import">
					<sources items="foo"/>
					<embeddedGrammar isDispatching="True"><iterator><code>
						yield "one", {"x": "x1", "y": "FAIL"}
					</code></iterator></embeddedGrammar>
				</data>
			""")
		self.assertRaisesWithMsg(base.ReportableError,
			"Grammar tries to feed to role 'one', but there is no corresponding make",
			rsc.makeData, (dd,))


class CrossResolutionTest(testhelpers.VerboseTest):
# NOTE: DaCHS-internal code should use base.resolveCrossId instead of
# of the getReferencedElement exercised here.
	def testItemAbsolute(self):
		res = rscdef.getReferencedElement("data/test#ADQLTest")
		self.assertEqual(res.id, "ADQLTest")
	
	def testRDAbsolute(self):
		res = rscdef.getReferencedElement("data/test")
		self.assertEqual(res.sourceId, "data/test")

	def testRDTypecheck(self):
		self.assertRaisesWithMsg(base.StructureError,
			"Reference to 'data/test' yielded object of type RD, but expected"
				" a(n) TableDef instance.",
			rscdef.getReferencedElement,
			("data/test", rscdef.TableDef))

	def testItemError(self):
		self.assertRaisesWithMsg(base.NotFoundError,
			"Element with id 'nonexisting' could not be located in RD data/test",
			rscdef.getReferencedElement,
			("data/test#nonexisting",))

	def testRDError(self):
		self.assertRaisesWithMsg(base.NotFoundError,
			"Resource descriptor 'nonexisting' could not be located in file system",
			rscdef.getReferencedElement,
			("nonexisting",))

	def testItemRelative(self):
		wd = os.path.join(base.getConfig("inputsDir"), "test")
		with testhelpers.testFile(os.path.join(wd, "rel.rd"),
				"""<resource schema="test"><table id="bar"/></resource>"""):
			with utils.in_dir(wd):
				res = rscdef.getReferencedElement("rel#bar", rscdef.TableDef)
				self.assertEqual(res.id, "bar")

	def testRelativeMessage(self):
		wd = os.path.join(base.getConfig("inputsDir"), "test")
		with utils.in_dir(wd):
			self.assertRaisesWithMsg(base.NotFoundError,
				"Resource descriptor 'test/foo' could not be located in file system",
				rscdef.getReferencedElement,
				("foo#bar", rscdef.TableDef))

	def testRDRelative(self):
		wd = os.path.join(base.getConfig("inputsDir"), "test")
		with testhelpers.testFile(os.path.join(wd, "rel.rd"),
				"""<resource schema="test"><table id="bar"/></resource>"""):
			with utils.in_dir(wd):
				res = rscdef.getReferencedElement("rel")
				self.assertEqual(res.sourceId, "test/rel")


class _SourceHierarchy(testhelpers.TestResource):
	def make(self, ignored):
		self.root = tempfile.mkdtemp("sourcetest", dir=base.getConfig("inputsDir"))
		for dirInd in range(3):
			dirName = os.path.join(self.root, "dir%s"%dirInd)
			os.mkdir(dirName)
			for fileInd in range(dirInd+1):
				with open(os.path.join(dirName, "file.%s"%fileInd), "w") as f:
					f.write(str(fileInd)+"\n")

		linkDir = os.path.join(self.root, "links", "data")
		os.makedirs(linkDir)
		os.symlink(os.path.join(self.root, "dir1"),
			os.path.join(linkDir, "a"))
		os.symlink(os.path.join(self.root, "dir2"),
			os.path.join(linkDir, "b"))

		return os.path.basename(self.root)

	def clean(self, ignored):
		shutil.rmtree(self.root)


class SourcesTest(testhelpers.VerboseTest):
	resources = [
		("resdir", _SourceHierarchy()), ("connection", tresc.dbConnection)]

	def testFollowLinks(self):
		rd = base.parseFromString(rscdesc.RD,
			'<resource schema="%s"><data id="import">'
			'<sources recurse="True" pattern="links/file*"/></data></resource>'%
				self.resdir)

		self.assertEqual(
			["/".join(s.split("/")[-2:])
				for s in rd.getById("import").sources.iterSources()],
			['a/file.0', 'a/file.1', 'b/file.0', 'b/file.1', 'b/file.2'])

	def testFollowLinksImmediate(self):
		rd = base.parseFromString(rscdesc.RD,
			'<resource schema="%s"><data id="import">'
			'<sources recurse="True" pattern="links/data/file*"/></data></resource>'%
				self.resdir)

		self.assertEqual(
			["/".join(s.split("/")[-2:])
				for s in rd.getById("import").sources.iterSources()],
			['a/file.0', 'a/file.1', 'b/file.0', 'b/file.1', 'b/file.2'])

	def testRecursePattern(self):
		rd = base.parseFromString(rscdesc.RD,
			'<resource schema="%s"><data id="import">'
			'<sources recurse="True" pattern="dir*/*.0"/></data></resource>'%
				self.resdir)
		self.assertEqual(
			["/".join(s.split("/")[-2:])
				for s in rd.getById("import").sources.iterSources()],
			['dir0/file.0', 'dir1/file.0', 'dir2/file.0'])

	def testIgnorePattern(self):
		rd = base.parseFromString(rscdesc.RD,
			'<resource schema="%s"><data id="import">'
			'<sources recurse="True" pattern="dir*/*">'
			'<ignoreSources pattern="*.1"/></sources></data></resource>'%
				self.resdir)
		self.assertEqual(
			["/".join(s.split("/")[-2:])
				for s in rd.getById("import").sources.iterSources()],
			['dir0/file.0', 'dir1/file.0', 'dir2/file.0', 'dir2/file.2'])

	def testIgnoreFromdb(self):
		rd = base.parseFromString(rscdesc.RD,
			r"""
			<resource schema="test">
				<table id="withdate" mixin="//products#table" onDisk="True"
						primary="accref" forceUnique="True" dupePolicy="overwrite">
					<column name="mtime" type="timestamp"/>
				</table>
				<data id="import" updating="True">
					<sources pattern="*.kram">
						<ignoreSources
							fromdbUpdating="select accref, mtime from \schema.withdate"/>
					</sources>
					<columnGrammar>
						<rowfilter procDef="//products#define">
							<bind key="table">"\schema.withdate"</bind>
						</rowfilter>
					</columnGrammar>
					<make table="withdate">
						<rowmaker>
							<map key="mtime">datetime.datetime.utcfromtimestamp(
								os.path.getmtime(\fullPath))</map>
						</rowmaker>
					</make>
				</data>
			</resource>""")
		rd.sourceId = "test/withdate"
	
		dd = rd.getById("import")
		datadir = os.path.join(base.getConfig("inputsDir"), "test/")
		try:
			with testhelpers.testFile("a.kram", "a", inDir=datadir):
				with testhelpers.testFile("b.kram", "b", inDir=datadir):
					os.utime(datadir+"a.kram", (1e9, 1e9))
					os.utime(datadir+"b.kram", (1.3e9, 1.3e9))
					rsc.makeData(dd, connection=self.connection)

					ts = list(self.connection.query("select mtime from test.withdate"
						" where accref='test/a.kram'"))[0][0]
					self.assertEqual(ts, datetime.datetime(2001, 9, 9, 1, 46, 40))

					os.utime(datadir+"a.kram", None)
					data = rsc.makeData(dd, connection=self.connection)
					# b.kram is not re-processed as it's not been touched;
					# but a.kram is; rowcount, however, is 0 here because
					# an exiting row is being overwritten.
					self.assertEqual(data.nAffected, 0)
					# but make sure the overwrite has actually taken place
					ts = list(self.connection.query("select mtime from test.withdate"
						" where accref='test/a.kram'"))[0][0]
					self.assertNotEqual(ts, datetime.datetime(2001, 9, 9, 1, 46, 40))
		finally:
			rsc.Data.drop(dd, connection=self.connection)
			self.connection.commit()


class _TestDump(testhelpers.TestResource):
	resources = [
		("ad", tresc.adqlGeoTable),
		("cs", tresc.csTestTable),]

	def make(self, ignored):
		with utils.sandbox():
			with open("my.dump", "wb") as f:
				rsc.createDump(
					["data/test#csdata", "data/test#ADQLgeo"],
						f, binary=False)
			with open("my.dump", "rb") as f:
				return f.read()


class DumpsTest(testhelpers.VerboseTest):
	resources = [("tarbytes", _TestDump())]

	def testMembersPresent(self):
		tf = tarfile.open(
			fileobj=BytesIO(self.tarbytes.original),
			mode="r:gz")
		self.assertEqual([i.name for i in tf.getmembers()],
			["table_000.textdump", "table_001.textdump", "index.txt"])

	def testInfo(self):
		ti = list(dumping.iterTableInfos(BytesIO(self.tarbytes.original)))
		self.assertEqual(
			set(i[1] for i in ti),
			set(["data/test#csdata", "data/test#ADQLgeo"]))
		self.assertEqual(set(i[2] for i in ti), set([True]))
		minDate = min(i[3] for i in ti)
		self.assertTrue(time.time()-20<=minDate<=time.time())
		self.assertTrue(min(i[4] for i in ti)>10)

	def testRestore(self):
		# if this test fails, you may want to just re-create the test environment.
		with base.getWritableAdminConn() as conn:
			csRows = list(conn.queryToDicts("select * from test.csdata"))
			geoRows = list(conn.queryToDicts("select * from test.adqlgeo"))
			conn.execute("delete from test.csdata")
			conn.execute("drop table test.adqlgeo")
			conn.commit()
		
			self.assertEqual(
				list(conn.queryToDicts("select * from test.csdata")),
				[])

		rsc.restoreDump(BytesIO(self.tarbytes.original))

		with base.getTableConn() as conn:
			self.assertEqual(
				geoRows,
				list(conn.queryToDicts("select * from test.adqlgeo")))
			self.assertEqual(
				csRows,
				list(conn.queryToDicts("select * from test.csdata")))


class ViewStatementTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testMalformed(self):
		rd = base.parseFromString(rscdesc.RD, """<resource schema="test">
			<table onDisk="true" id="deleteme">
				<column name="foo"/>
				<viewStatement>
					CREATE VIEW \\curtable AS (
						SELECT \\colNames FROM (
							SELECT 1 as bar, 2 as quux) AS q)
				</viewStatement>
			</table>
			<data id="import">
				<make table="deleteme"/>
			</data>
		</resource>""")
		self.assertRaisesWithMsg(base.ReportableError,
			'View statement of table at IO:\'<resource schema="test"> <table onDisk="true" id="del...\', line 2 bad.  Postgres error message: column "foo" does not exist\nLINE 3:       SELECT foo FROM (\n                     ^\n',
			rsc.makeData,
			(rd.getById("import"),),
			connection=self.conn)


def roundtripThroughDB(conn, type, testData):
	"""roundtrips the values in testData through a temporary database
	table with for type and returns the result.
	"""
	rd = base.parseFromString(rscdesc.RD, f"""<resource schema="test">
		<table id="rttest" temporary="True" onDisk="True">
			<column name="seq" type="integer"/>
			<column name="testval" type="{type}"/>
		</table>

			<data id="do_roundtrip">
				<dictlistGrammar/>
				<make table="rttest"/>
			</data>
			</resource>
			""")
	rsc.makeData(rd.dds[0],
		forceSource=[{"testval": v, "seq": i}
			for i, v in enumerate(testData)],
		connection=conn)

	return [row[1]
		for row in sorted(conn.query("select seq, testval from rttest"))]


class DatabaseRoundtripTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testBytes(self):
		afterRoundtrip = [bytes(v) for v in
			roundtripThroughDB(self.conn, "bytea", [
				b'\0', b'a\0b', br'\x40', r'\070\100'])]
		# yes, the last (raw) string *does* interpret escapes.
		# That's (questionable) literals.parseBytes behaviour.
		self.assertEqual(afterRoundtrip,
			[b'\0', b'a\0b', b'\\x40', b'8@'])

	def testJson(self):
		afterRoundtrip = roundtripThroughDB(self.conn, "json", [
				None, '"Grätsche"', ['eins', 'zwei'], {'key': 'value'}])
		# it *is* a bit lame that json string literals lose their
		# quotes on roundtripping, but I think that's preferable
		# vs. not admitting in json literals.
		self.assertEqual(afterRoundtrip,
			[None, 'Grätsche', ['eins', 'zwei'], {'key': 'value'}])

	def testJsonb(self):
		afterRoundtrip = roundtripThroughDB(self.conn, "jsonb", [
				None, '"Grätsche"', ['eins', 'zwei'], {'key': 'value'}])
		self.assertEqual(afterRoundtrip,
			[None, 'Grätsche', ['eins', 'zwei'], {'key': 'value'}])


class _DynTableRD(testhelpers.TestResource):
	def make(self, deps):
		return base.parseFromString(rscdesc.RD, """<resource schema="test">
			<table id="a">
				<param name="animal" type="text">Gnu</param>
				<column name="height" unit="m"/>
			</table>

			<table id="b">
				<param name="animal" type="text">Bug</param>
				<column name="height" unit="mm"/>
			</table>

			<data id="import">
				<sources item="x"/>
				<embeddedGrammar>
					<iterator>
						<code>
							for i in range(20):
								yield {"height": i}
						</code>
					</iterator>
				</embeddedGrammar>
				
				<make table="a"/>
			</data>
		</resource>""")


class DynamicMakeTest(testhelpers.VerboseTest):
	resources = [("rd", _DynTableRD())]

	def testMakeTableA(self):
		dd = self.rd.getById("import")
		data = rsc.Data.createWithTable(dd, self.rd.getById("a"))
		rsc.makeData(dd, data=data)

		t = data.getPrimaryTable()
		self.assertEqual(t.getParam("animal"), "Gnu")
		self.assertEqual(t.tableDef.getColumnByName("height").unit, "m")

	def testMakeTableB(self):
		dd = self.rd.getById("import")
		data = rsc.Data.createWithTable(dd, self.rd.getById("b"))
		rsc.makeData(dd, data=data)

		t = data.getPrimaryTable()
		self.assertEqual(t.getParam("animal"), "Bug")
		self.assertEqual(t.tableDef.getColumnByName("height").unit, "mm")


class OddImportsTest(tresc.TestWithDBConnection):
	def testImportStringIntoPoly(self):
		rd = base.parseFromString(rscdesc.RD, """<resource schema="test">
			<table id="a" onDisk="True">
				<column name="p" type="spoly"/>
			</table>

			<data id="import"><dictlistGrammar/><make table="a"/></data>
		</resource>""")
		self.assertRaisesWithMsg(base.DBError,
			"can't adapt type 'GeomExpr'",
			rsc.makeData,
			(rd.getById("import"),),
			forceSource=[{"p": "Union ICRS TOPOCENTER"
			" (Polygon 180.0 45.0 230.0 80.0 300.0 30.0 180.0 45.0"
			" Polygon 180.0 45.0 50.0 80.0 60.0 30.0 180.0 45.0)"}],
			connection=self.conn)


if __name__=="__main__":
	testhelpers.main(DispatchedGrammarTest)
