"""
Tests for event propagation and user interaction.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import os
import re
import shutil
import subprocess

from gavo.helpers import testhelpers

from gavo import base
from gavo import api
from gavo import rsc
from gavo import rscdesc
from gavo import utils
from gavo.helpers import testtricks
from gavo.user import cli

import tresc


def extractoutput(owd):
	# an extractor function for utils.sandbox that preserves the outputs
	# of tests run in sandboxes
	for name in ["output.stderr", "output.stdout"]:
		dest = os.path.join(owd, name)
		if os.path.exists(dest):
			os.unlink(dest)
		if os.path.exists(name):
			shutil.move(name, dest)


class MiscCLITest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testUnhandledException(self):
		self.assertOutput(cli.main, argList=["raise"],
			expectedStderr=lambda msg: b"Unhandled exception" in msg,
			expectedRetcode=1)

	def testTopLevelHelp(self):
		self.assertOutput(cli.main, argList=["--help"],
			expectedStdout=["<func> is a unique"])

	def testNonUniqueMatch(self):
		self.assertOutput(cli.main, argList=["a"],
			expectedRetcode=1,
			expectedStderr=
				lambda msg: msg.startswith(b"Multiple matches for function a"))

	def testNoMatch(self):
		self.assertOutput(cli.main, argList=["xyloph"],
			expectedRetcode=1,
			expectedStderr=
				lambda msg: msg.startswith(b"No match for function xyloph"))

	def testSubCmdHelp(self):
		self.assertOutput(cli.main, argList=["publish", "--help"],
			expectedStdout=lambda msg:
				b"usage: dachs publish [-h] [-k] [-u] rd [rd ...]" in msg)

	def testAProg(self):
		self.assertOutput(cli.main, argList=["stc", "utypes", "Position ICRS"],
			expectedStdout=["AstroCoordSystem.SpaceFrame.CoordRefFrame"])

	def testCleanTAP(self):
		self.conn.commit()
		self.assertOutput(cli.main, argList=["admin", "cleantap"],
			expectedRetcode=0)

	def testExecute(self):
		self.assertOutput(cli.main, argList=["admin", "exec",
			"data/regtest#silly"],
			expectedStdout="Silly\n",
			expectedStderr="Spawning thread for cron job Do silly things\n")

	def testCleanTAPPending(self):
		self.conn.commit()
		self.assertOutput(cli.main, argList=["admin", "cleantap", "-p"],
			expectedRetcode=0, expectedStderr="")

	def testDumpDFSuccess(self):
		self.assertOutput(cli.main, argList=["adm", "dumpDF", "data/filters.txt"],
			expectedRetcode=0,
			expectedStdout=["# Bandpass names are typically common"])
	
	def testDumpDFNotFound(self):
		self.assertOutput(cli.main, argList=["adm", "dumpDF", "missing.dat"],
			expectedRetcode=1,
			expectedStderr="*** Error: No such distribution file: missing.dat\n\n")

	def testDumpDFRD(self):
		self.assertOutput(cli.main, argList=["adm", "dumpDF", "//scs"],
			expectedRetcode=0,
			expectedStdout=['<outputField id="distCol"'])

	def testXSDValInvalid(self):
		with testhelpers.testFile("bad.xml", "<ramsch/>") as p:
			self.assertOutput(cli.main, argList=["adm", "xsd", p],
				expectedRetcode=1,
				expectedStdout=lambda txt: re.match(b".*/tmp/bad.xml --"
				b' <string>:1:0:ERROR:SCHEMASV:SCHEMAV_CVC_ELT_1:'
				b" Element 'ramsch': No matching global declaration available"
				b' for the validation root.\n$', txt))

	def testXSDValValid(self):
		with testhelpers.testFile("bad.xml", '<VOTABLE'
				' xmlns="http://www.ivoa.net/xml/VOTable/v1.3"><RESOURCE/>'
				'</VOTABLE>') as p:
			self.assertOutput(cli.main, argList=["adm", "xsd", p],
				expectedRetcode=0,
				expectedStdout=lambda txt:
					re.match(b'.*/tmp/bad.xml -- valid\n$', txt))

	def testShowDDGood(self):
		self.assertOutput(cli.main, argList=["show", "dds", "//tap"],
			expectedStdout="importTablesFromRD\nimportDMsFromRD\nimportColumnsFromRD\nimportGroupsFromRD\nimportFkeysFromRD\ncreateSchema*\nrefresh\ncreateJobTable*\n")

	def testShowDDBad(self):
		self.assertOutput(cli.main, argList=["show", "dds", "//nonex"],
			expectedRetcode=1,
			expectedStderr=lambda msg:
				b"Resource descriptor '__system__/nonex' could not be located" in msg)

	def testVersion(self):
		self.assertOutput(cli.main, argList=["--version"],
			expectedRetcode=0,
			expectedStdout=lambda msg: re.match(
				rb"Software \(\d+\.\d+(\.\d+)?\)"
				rb" Schema \(\d+/-?\d+\)" b"\n", msg))

	def testDachsInitJustRuns(self):
		from gavo.user import initdachs

		self.assertOutput(initdachs.main, argList=["--nodb"],
			expectedRetcode=0,
			expectedStdout="",
			expectedStderr="")

	def testMkboost(self):
		self.assertOutput(cli.main, argList=["mkboost", "data/dgs#col"],
			expectedStdout=["#include <stdio.h>",
				"fi_l,            /* L, smallint */",
				"void createDumpfile(int argc, char **argv)"])
	
	def testADQL(self):
		self.assertOutput(cli.main, argList=["adql", "select top 1 * from"
			" tap_schema.tables"],
			expectedStdout=[
				'<DESCRIPTION>Tables available for ADQL querying.</DESCRIPTION>',
				'<STREAM encoding="base64">'],
			expectedStderr=["Sending to postgres:", "LIMIT 1"])

	def testIndexStatements(self):
		self.assertOutput(cli.main, argList=["adm", "indexS",
			"//products#products"],
			expectedStdout="CREATE INDEX products_sourceTable ON"
				" dc.products (sourceTable)\n")

	def testUpdateTAPSchema(self):
		self.assertOutput(cli.main, argList=["adm", "updateTAP"],
			expectedStdout="",
			expectedStderr=lambda output: "Starting refresh\nDone refresh, read 0\n")


class _CatalogTestResource(tresc.FileResource):
	path = "inputs/somecat/q.rd"
	content = """<resource schema="somecat">
			<meta name="creationDate">2000-01-01T00:00:00</meta>
			<meta name="title">A test resource</meta>
			<meta name="description">(for DaCHS)</meta>
			<meta name="subject">Objectivity</meta>
			<table onDisk="True" id="cat">
				<column name="ra" ucd="pos.eq.ra;meta.main"/>
				<column name="dec" ucd="pos.eq.dec;meta.main"/>
				<column name="t_min" type="timestamp"/>
				<column name="t_max" type="timestamp"/>
				<publish/>
			</table>
			<coverage>
				<updater sourceTable="cat"/>
			</coverage>
			<data id="import">
				<sources><item>1</item><item>4</item><item>x</item></sources>
				<embeddedGrammar>
					<iterator><code>
						seed = float(self.sourceToken)
						yield {
							"ra": seed+4,
							"dec": 2-seed,
							"t_min": datetime.datetime(int(2010+seed), int(seed)%12, 1, 6),
							"t_max": datetime.datetime(int(2010+seed), int(seed)%12, 1, 12),
						}
					</code></iterator>
				</embeddedGrammar>
				<make table="cat"/>
			</data>
		</resource>"""


class ImportTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection),
		("res", _CatalogTestResource()),
		("obscorePublished", tresc.ssaTestTable),
		("afits", tresc.fitsTable)]

	def testLifecycle(self):
		self.conn.commit()
		with base.AdhocQuerier() as querier:
			self.assertOutput(cli.main,
				argList=["--suppress-log",
					"imp", "-c", "somecat/q"],
				expectedStderr=["Rows affected: 2"],
				expectedRetcode=101)

			self.assertFalse(querier.getTableType("somecat.cat") is None)
			self.assertFalse(querier.getTableType("test.typesTable") is not None)

			self.assertOutput(cli.main,
				argList=["--disable-spew", "--suppress-log", "publish", "somecat/q"],
				expectedRetcode=0, expectedStdout="")
			
			self.assertTrue(list(querier.connection.query("SELECT * FROM dc.subjects"
				" WHERE subject=%(s)s", {'s': "Objectivity"})))

			self.assertOutput(cli.main,
				argList=["--disable-spew", "--suppress-log", "limits", "somecat/q"],
				expectedRetcode=0,
				expectedStderr=lambda o: o.startswith(
					b"Obtaining metadata for rd somecat"))
			
			self.assertOutput(cli.main,
				argList=["limits", "-d", "somecat/q"],
				expectedStdout=lambda output: testtricks.assertHasStrings(output, [
					"spatial converage 6/18064 18098",
					"temporal converage [55562.25, 56748.5]",
					"spectral converage <Unknown>",
					"Statistics for somecat.cat",
					"0.9", "…"]) or True)

			base.caches.clearForName("somecat/q")
			changedRd = base.caches.getRD("somecat/q")
			self.assertEqual(changedRd.coverage.spatial, "6/18064 18098")
			self.assertEqual(changedRd.coverage.spectral, [])
			self.assertEqual(changedRd.coverage.temporal, [(55562.25, 56748.5)])

			# drop it all, make sure all traces are gone
			self.assertOutput(cli.main,
				argList=["--ui", "semistingy",  "--suppress-log",
					"drop", "somecat/q"], expectedStdout="", expectedStderr="")
			self.assertFalse(list(querier.connection.query("SELECT * FROM dc.subjects"
				" WHERE subject=%(s)s", {'s': "Objectivity"})))
			self.assertTrue(querier.getTableType("comecat.cat") is None)

	def testImportDeniedForOffInputs(self):
		destName = os.path.expanduser("~/foobar.rd")
		with testhelpers.testFile(destName, '<resource schema="junk"/>'):
			self.assertOutput(cli.main,
				argList=["imp", destName],
				expectedRetcode=1, expectedStderr=lambda tx:
				re.match(rb"\*\*\* Error: .*/foobar.rd: Only RDs below inputsDir"
					b"\n.*/inputs. are allowed.\n", tx) is not None)

	def testMetaImportAndPurge(self):
		self.assertOutput(cli.main, argList=["purge", "test.adql"])
		# make sure the test schema exists before running the test
		self.conn.commit()
		self.assertOutput(cli.main,
			argList=["imp", "data/test", "productimport-skip"],
			expectedStderr=["Making data data/test#pro"])
		try:
			with base.getWritableAdminConn() as conn:
				conn.execute("CREATE TABLE test.adql (erratic INTEGER)")
				conn.execute("INSERT INTO test.adql VALUES (1)")

			with base.getTableConn() as conn:
				self.assertOutput(cli.main, argList=
					["imp", "-m", "data/test", "ADQLTest"],
					expectedStderr=[
						"Updating meta for ADQLTest\n"])
				self.assertEqual(list(conn.query(
					"select tablename, sourcerd, adql"
					"  from dc.tablemeta where tablename='test.adql'")),
					[('test.adql', 'data/test', True)])
				
				# make sure gavo imp didn't touch the table
				self.assertEqual(list(conn.query("SELECT * FROM test.adql")),
					[(1,)])
		finally:
			self.assertOutput(cli.main, argList=["purge", "test.adql"])

	def testNonExistingDataImpError(self):
		with testtricks.testFile(
				os.path.join(base.getConfig("inputsDir"), "empty.rd"),
				"""<resource schema="test"><table id="foo"/></resource>"""):
			self.assertOutput(cli.main, argList=["--hints", "imp", "empty", "foo"],
				expectedRetcode=1, expectedStderr="*** Error: The DD 'foo'"
					" you are trying to import is not defined within\nthe RD"
					" 'empty'.\nHint: Data elements available in empty include (None)\n")

	def testNoAutoDataImpError(self):
		with testtricks.testFile(
				os.path.join(base.getConfig("inputsDir"), "empty.rd"),
				"""<resource schema="test"><table id="foo"/>
				<data auto="False" id="x"><make table="foo"/></data>
				<data auto="False" id="y"><make table="foo"/></data>
				</resource>"""):
			self.assertOutput(cli.main, argList=["--hints", "imp", "empty"],
				expectedRetcode=1, expectedStderr='*** Error: Neither automatic'
					' not manual data selected from RD empty\nHint: All data'
					' elements have auto=False.  You have to explicitly name\none'
					' or more data to import (names available: x, y)\n')

	def testMetaNotPropagatedMessage(self):
		self.assertOutput(cli.main,
			argList=["imp", "-m", "data/test", "recaftertest"],
			expectedRetcode=0,
			expectedStderr=["not rebuilding dependencies.",
				": data/test#import_pythonscript"])

	def testPubUnpub(self):
		self.assertOutput(cli.main,
			argList=["--disable-spew", "--suppress-log", "publish", "somecat/q"],
			expectedRetcode=0, expectedStdout="")
		self.assertEqual(
			list(self.conn.query("SELECT resid, deleted FROM dc.resources"
			" WHERE sourcerd='somecat/q'")),
			[('cat', False)])
		self.assertOutput(cli.main,
			argList=["--disable-spew", "--suppress-log", "publish", "-u",
				"somecat/q"], expectedRetcode=0, expectedStdout="")
		self.assertEqual(
			list(self.conn.query("SELECT resid, deleted FROM dc.resources"
			" WHERE sourcerd='somecat/q'")),
			[('cat', True)])

	def testCacheprev(self):
		# we only see if this thing crashes; actually deleting and creating
		# previews is too much hassle at this point.
		self.assertOutput(cli.main,
			argList=["adm", "cacheprev", "data/test#prodtest"],
				expectedStdout="")

	def testObscoreSurvivesReindex(self):
		with base.getTableConn() as conn:
			res = list(conn.query("SELECT obs_id FROM ivoa.obscore WHERE"
				" source_table='test.hcdtest' and obs_id='data/spec1.ssatest'"))
		self.assertEqual(len(res), 1)

		self.assertOutput(cli.main,
			argList=["--disable-spew", "--suppress-log", "import", "-I",
				"data/ssatest", "test_import"],
			expectedRetcode=0,
			expectedStderr=lambda t: b"Create index hcdtest_ssa_pubDID" in t)

		with base.getTableConn() as conn:
			res = list(conn.query("SELECT obs_id FROM ivoa.obscore WHERE"
				" source_table='test.hcdtest' and obs_id='data/spec1.ssatest'"))
		self.assertEqual(len(res), 1)

	def testObscoreLimits(self):
		self.assertOutput(cli.main,
			argList=["limits", "//obscore"],
			expectedRetcode=0,
			expectedStderr=lambda res:
				b"Obtaining metadata for rd __system__/obs" in res)


class _SysRDResource(tresc.FileResource):
	path = "inputs/sysrd.rd"
	content = """<resource schema="test">
			<table onDisk="True" id="fromclitest" system="True">
				<column name="x" type="integer"><values nullLiteral="0"/></column>
			</table>
			<data id="y"><make table="fromclitest"/></data>
		</resource>"""


class SystemImportTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection),
		("sysrdFile", _SysRDResource())]

	def _fillTable(self):
		rd = api.getRD("sysrd")
		t = rsc.TableForDef(rd.getById("fromclitest"), connection=self.conn)
		t.addRow({'x': 2})
		self.conn.commit()

	def testNoSystemImportDefault(self):
		self.assertOutput(cli.main, argList=["imp", "--system", "sysrd"],
			expectedRetcode=0,
			expectedStderr=["Making data sysrd#y", "Rows affected: 0"])
		# Write a 2 into the table that must survive the next imp
		self._fillTable()

		self.assertOutput(cli.main, argList=["imp", "sysrd"],
			expectedRetcode=0,
			expectedStderr=["Making data sysrd#y", "Rows affected: 0"])
		self.assertEqual(
			list(self.conn.query("select * from test.fromclitest")),
			[(2,)])
			
	def testSystemImport(self):
		self.assertOutput(cli.main, argList=["imp", "--system", "sysrd"],
			expectedRetcode=0, expectedStderr=["Making data sysrd#y"])
		# Write a 2 into the table that must survive the next imp
		self._fillTable()

		self.assertOutput(cli.main, argList=["imp", "--system", "sysrd"],
			expectedRetcode=0, expectedStderr=["Making data sysrd#y"])
		self.assertEqual(
			list(self.conn.query("select * from test.fromclitest")),
			[])

	def testSystemDropDrops(self):
		self.assertOutput(cli.main, argList=[
			"--ui", "stingy", "drop", "--system", "sysrd"],
			expectedRetcode=0, expectedStderr="", expectedStdout="")
		with base.AdhocQuerier() as q:
			self.assertTrue(q.getTableType("test.fromclitest") is None)


class _FITSGeneratedRD(testhelpers.TestResource):
	def make(self, ignored):
		p = testhelpers.ForkingSubprocess(
			["test harness", "--debug", "--ui", "stingy", "mkrd", "-r",
				str(os.path.join(base.getConfig("inputsDir"), "data")),
				"test_data/ex.fits"],
			executable=cli.main, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
			stdin=subprocess.PIPE)
		out, err = p.communicate(input=b"")
		retcode = p.wait()

		if retcode:
			raise AssertionError("panic: generating RD failed, bailing out.\n%s\n"%
				err)
		return out


class MkRDTest(testhelpers.VerboseTest, testhelpers.XSDTestMixin):
	resources = [("fitsrd", _FITSGeneratedRD()), ('oc', tresc.obscoreTable)]

	def testFITSRDLooksOk(self):
		for frag in [
				'<column name="color" type="text"',
				'description="Effective exposure time [seconds]"',
				'<map key="exptime">EXPTIME</map>']:
			self.assertTrue(utils.bytify(frag) in self.fitsrd, "%s missing"%frag)
	
	def testRunImp(self):
		with testhelpers.testFile(
				os.path.join(base.getConfig("inputsDir"), "gen.rd"),
				self.fitsrd.original):
			self.assertOutput(cli.main, ["imp", "gen"],
				expectedStderr=["Rows affected: 2"], expectedRetcode=0)
			self.assertOutput(cli.main, ["--ui", "stingy", "drop", "gen"])

	def testFITSRDMetaFirst(self):
		self.assertTrue(self.fitsrd.split(b"\n")[1].strip().startswith(b"<meta"))

	def testStartList(self):
		self.assertOutput(cli.main, ["start", "list"],
			expectedStdout=["scs -- Catalogs via SCS and TAP"])

	def testStartBadCall(self):
		self.assertOutput(cli.main, ["start", "knakurtt"],
			expectedRetcode=1,
			expectedStderr="*X*X* No template for knakurtt.\n")

	def testStartSCS(self):
		def makeAsserter(resdir):
			def assertThings(output):
				self.assertEqual(output, b"")
				with open("q.rd") as f:
					generated = f.read()

				self.assertWellformed(generated)

				self.assertTrue(
					'schema="%s"'%resdir in generated)
				self.assertTrue(
					"%s-"%datetime.datetime.utcnow().year
					in generated)
				self.assertTrue(
					'<meta name="creator">%authors' in generated)

				return True
			return assertThings

		with utils.sandbox(
				tmpdir=base.getConfig("inputsDir"),
				extractfunc=extractoutput):
			resdir = os.path.split(os.getcwd())[-1]
			self.assertOutput(cli.main, ["start", "scs"],
				expectedStdout=makeAsserter(resdir))

	def testStartSIAP(self):
		def makeAsserter(resdir):
			def assertThings(output):
				self.assertEqual(output, b"")
				with open("q.rd", "r", encoding="utf-8") as f:
					generated = f.read()

				self.assertTrue(
					'schema="%s"'%resdir in generated)
				self.assertWellformed(generated)
				self.assertTrue('//obscore#publishObscoreLike' in generated)
				return True

			return assertThings

		with utils.sandbox(
				tmpdir=base.getConfig("inputsDir"),
				extractfunc=extractoutput):
			resdir = os.path.split(os.getcwd())[-1]
			self.assertOutput(cli.main, ["start", "siap"],
				expectedStdout=makeAsserter(resdir))

	def testStartSSAP(self):
		def makeAsserter(resdir):
			def assertThings(output):
				self.assertEqual(output, b"")
				with open("q.rd", "r", encoding="utf-8") as f:
					generated = f.read()

				self.assertTrue(
					'schema="%s"'%resdir in generated)
				self.assertWellformed(generated)
				self.assertTrue('//obscore#publishSSAPMIXC' in generated)
				return True

			return assertThings

		with utils.sandbox(
				tmpdir=base.getConfig("inputsDir"),
				extractfunc=extractoutput):
			resdir = os.path.split(os.getcwd())[-1]
			self.assertOutput(cli.main, ["start", "ssap+datalink"],
				expectedStdout=makeAsserter(resdir))

	def testStartEPNTAP(self):
		def makeAsserter(resdir):
			def assertThings(output):
				self.assertEqual(output, b"")
				with open("q.rd") as f:
					generated = f.read()

				self.assertTrue(
					'schema="%s"'%resdir in generated)
				self.assertWellformed(generated)
				self.assertTrue('>//epntap2#localfile-2_0' in generated)
				return True

			return assertThings

		with utils.sandbox(
				tmpdir=base.getConfig("inputsDir"),
				extractfunc=extractoutput):
			resdir = os.path.split(os.getcwd())[-1]
			self.assertOutput(cli.main, ["start", "epntap"],
				expectedStdout=makeAsserter(resdir))


class _MyRDResource(tresc.FileResource):
	path = "inputs/myrd.rd"
	content = """<resource schema="test">
			<table onDisk="True" id="autotable">
				<column name="x" type="integer" required="True"/></table>
			<table onDisk="True" id="noautotable">
				<column name="x" type="integer" required="True"/></table>
			<data id="y"><make table="autotable"/></data>
			<data id="z" auto="False"><make table="noautotable"/></data>
		</resource>"""


class DropTest(testhelpers.VerboseTest):
	resources = [("myrdFile", _MyRDResource())]

	def testAutoDropping(self):
		self.assertOutput(cli.main, argList=["imp", "myrd", "y", "z"],
			expectedRetcode=0, expectedStderr=lambda _: True)
		self.assertOutput(cli.main, argList=["--ui", "stingy", "drop", "myrd"],
			expectedRetcode=0, expectedStderr="")
		with base.AdhocQuerier() as q:
			self.assertTrue(q.getTableType("test.autotable") is None)
			self.assertFalse(q.getTableType("test.noautotable") is None)

	def testAllDropping(self):
		self.assertOutput(cli.main, argList=["imp", "myrd", "y", "z"],
			expectedRetcode=0, expectedStderr=lambda _: True)
		self.assertOutput(cli.main, argList=["--ui", "stingy", "drop",
			"myrd", "--all"],
			expectedRetcode=0, expectedStderr="")
		with base.AdhocQuerier() as q:
			self.assertTrue(q.getTableType("test.autotable") is None)
			self.assertTrue(q.getTableType("test.noautotable") is None)

	def testNamedDropping(self):
		self.assertOutput(cli.main, argList=["imp", "myrd", "y", "z"],
			expectedRetcode=0, expectedStdout="", expectedStderr=lambda _: True)
		self.assertOutput(cli.main, argList=["--ui", "stingy", "drop",
			"myrd", "z"],
			expectedRetcode=0, expectedStderr="", expectedStdout="")
		with base.AdhocQuerier() as q:
			self.assertFalse(q.getTableType("test.autotable") is None)
			self.assertTrue(q.getTableType("test.noautotable") is None)

	def testForcedDropping(self):
		with base.getWritableAdminConn() as conn:
			conn.execute("INSERT INTO dc.tablemeta (tablename, sourcerd, adql)"
				" VALUES ('grabauski', 'no/nada/never', false)")
	
		self.assertOutput(cli.main, argList=["--ui", "semistingy",
			"drop", "-f", "no/nada/never"],
			expectedStdout="", expectedRetcode=0)

		with base.getTableConn() as conn:
			res = list(conn.query(
				"SELECT * from dc.tablemeta WHERE sourcerd='no/nada/never'"))
			self.assertEqual(res, [])

	def testMakeDelRecWorking(self):
		try:
			self.assertOutput(cli.main, argList=["adm", "makedelrec",
				"ivo://x-testing/art/deadsvc"], expectedStdout="")
			with base.getTableConn() as conn:
				self.assertEqual(
					list(conn.query("SELECT title, deleted FROM dc.resources"
						" WHERE resId='deadsvc'")),
					[('Ex ivo://x-testing/art/deadsvc', True)])
		finally:
			with base.getWritableAdminConn() as conn:
				conn.execute("DELETE FROM dc.resources WHERE"
					" resId='deadsvc'")
				conn.execute("DELETE FROM dc.sets WHERE"
					" resId='deadsvc'")

	def testMakeDelRecRefusing(self):
		self.assertOutput(cli.main, argList=["adm", "makedelrec",
			"ivo://org.gavo.dc/art/deadsvc"],
				expectedStderr="*** Error: You can only declare ivo ids"
					" from your own authority as\ndeleted.\n",
				expectedRetcode=1)


class ValidationTest(testhelpers.VerboseTest):
	def testValidUserconfig(self):
		base.caches.clearForName(str(rscdesc.USERCONFIG_RD_PATH))
		with testtricks.testFile(rscdesc.USERCONFIG_RD_PATH.with_suffix(".rd"),
				"""<resource schema="test"><STREAM id="foo"><column name="abc"/>
				</STREAM></resource>"""):
			self.assertOutput(cli.main, argList=["val", "%"],
				expectedRetcode=0, expectedStderr='',
				expectedStdout='% -- OK\n')

	def testInvalidUserconfig(self):
		base.caches.clearForName(str(rscdesc.USERCONFIG_RD_PATH))
		with testtricks.testFile(rscdesc.USERCONFIG_RD_PATH.with_suffix(".rd"),
				"""<resource schema="test"><STREAM id="foo"><column name="abc">
				</STREAM></resource>"""):
			self.assertOutput(cli.main, argList=["val", "%"],
				expectedRetcode=0, expectedStderr='',
				expectedStdout=lambda text: re.match(
					rb'% -- \[ERROR\] %: Malformed RD input, message follows\s+'
					rb"\*\*\* Error: .*etc/userconfig.rd"
					rb"\s+mismatched\s+tag:\s+line\s+2,\s+column 6", text))

	def testMiscValidation(self):
		with testhelpers.testFile(name=None, inDir=base.getConfig("inputsDir"),
				content="""<resource schema="notexistingXXX">
					<meta>
						title: Test resource
						creationDate: 2000-01-01
						description: no.
						subject: even less
					</meta>
					<table id="t1"/>
					<table id="t1"/>
					</resource>""") as path:
			rdId = path.split("/")[-1]

			def doAssertions(output):
				self.assertTrue(utils.bytify("[WARNING] %s: RD %s: resource directory"
					" '%s/inputs/notexistingXXX' does not exist"%(
					rdId, rdId, base.getConfig("rootDir")))
					in output)

				self.assertTrue(
					utils.bytify("%s -- [WARNING] %s: Element with id t1 overwritten."%(
					rdId, rdId)) in output)

				self.assertTrue(output.endswith(b"OK\n"))
				return True

			self.assertOutput(cli.main, argList=["val", rdId],
				expectedRetcode=0, expectedStderr='',
				expectedStdout=doAssertions)
		
	def testFileValidation(self):
		self.assertOutput(cli.main, argList=["val", "-c", "data/test"],
			expectedRetcode=0, expectedStderr='', expectedStdout="data/test -- OK\n")


class CLIReferenceTest(testhelpers.VerboseTest):
	def testNonExRD(self):
		self.assertRaisesWithMsg(base.RDNotFound,
			"Resource descriptor 'i/do/not/exist' could not be located"
			" in file system",
			api.getReferencedElement,
			("i/do/not/exist",))
	
	def testNoRDReference(self):
		with testhelpers.testFile(os.path.join(base.getConfig("inputsDir"),
				"nordref.rd"),
				"""<resource schema="__test"><table original="i/do/not#exist"/>
				</resource>"""):
			self.assertRaisesWithMsg(base.RDNotFound,
				"Resource descriptor 'i/do/not' could not be located in file system",
				api.getReferencedElement,
				("nordref",))

	def testNoElDirectReference(self):
		self.assertRaisesWithMsg(base.NotFoundError,
			"Element with id 'justrandomjunk' could not be located in RD data/test",
			api.getReferencedElement,
			("data/test#justrandomjunk",))

	def testNoElIndirectReference(self):
		with testhelpers.testFile(os.path.join(base.getConfig("inputsDir"),
				"badref.rd"),
				"""<resource schema="__test">
					<table original="data/test#justrandomjunk"/>
				</resource>"""):
			self.assertRaisesWithMsg(base.NotFoundError,
				"Element with id 'justrandomjunk' could not be located"
				" in RD data/test",
				api.getReferencedElement,
				("badref",))

	def testNoElIndirectReferenceInDir(self):
		baseDir = os.path.join(base.getConfig("inputsDir"), "test")
		with testhelpers.testFile(os.path.join(baseDir, "locbad.rd"),
				"""<resource schema="__test">
					<table original="data/test#justrandomjunk"/>
				</resource>"""):
			with utils.in_dir(baseDir):
				self.assertRaisesWithMsg(base.NotFoundError,
					"Element with id 'justrandomjunk' could not be located"
					" in RD data/test",
					api.getReferencedElement,
					("locbad",))


class DumpingTest(testhelpers.VerboseTest):
	resources = [("table", tresc.csTestTable)]

	def testRestoreMultiOverwriting(self):
		nrowsBefore = list(self.table.connection.query(
			"select count(*) from dc.groups"))
		self.table.connection.commit()

		with utils.sandbox(extractfunc=extractoutput):
			self.assertOutput(cli.main,
				argList=["dump", "create", "tmp.ddump",
					self.table.tableDef.getFullId(),
					"//users#groups"],
				expectedRetcode=0,
				expectedStdout="")
			self.assertOutput(cli.main,
				argList=["dump", "load", "tmp.ddump"],
				expectedRetcode=0,
				expectedStdout="")
		
		nrows = list(self.table.connection.query("select count(*) from %s"%
			self.table.tableDef.getQName()))[0][0]
		self.assertEqual(nrows, 1)
		nrows = list(self.table.connection.query(
			"select count(*) from dc.groups"))
		self.assertEqual(nrows, nrowsBefore)

	def testRestoreDeleted(self):
		tableId = self.table.tableDef.getFullId()

		with utils.sandbox(extractfunc=extractoutput):
			self.assertOutput(cli.main,
				argList=["dump", "create", "tmp.dump", tableId],
				expectedRetcode=0,
				expectedStdout="")

			self.assertOutput(cli.main,
				argList=["purge", self.table.tableDef.getQName()])
			self.assertEqual(self.table.exists(), False)

			self.assertOutput(cli.main,
				argList=["dump", "load", "tmp.dump"],
				expectedRetcode=0,
				expectedStdout="")
		
		nrows = list(self.table.connection.query("select count(*) from %s"%
			self.table.tableDef.getQName()))[0][0]
		self.assertEqual(nrows, 1)

	def testLsDump(self):
		with utils.sandbox(extractfunc=extractoutput):
			self.assertOutput(cli.main,
				argList=["dump", "create", "tmp.ddump",
					self.table.tableDef.getFullId(),
					"//tap"],
				expectedRetcode=0,
				expectedStdout="")
			self.assertOutput(cli.main,
				argList=["dump", "ls", "tmp.ddump"],
				expectedRetcode=0,
				expectedStdout=[
					"data/test#csdata",
					"__system__/tap#supportedmodels",
					" probably ",
					" 59"])


class ServerCallsTest(testhelpers.VerboseTest):
	def testRunTAP(self):
		from gavo.protocols import tap

		jobId = tap.WORKER_SYSTEM.getNewJobId()
		with tap.WORKER_SYSTEM.changeableJob(jobId) as wjob:
			wjob.setPar("LANG", "ADQL")
			wjob.setPar("QUERY",
				"SELECT TOP 1 * FROM TAP_SCHEMA.tables")
		try:
			self.assertOutput(cli.main, argList=["taprun", jobId],
				expectedStdout="", expectedRetcode=0, expectedStderr=[
					"taprunner for %s finished"%jobId])
		finally:
			tap.WORKER_SYSTEM.destroy(jobId)

	def testRunUWS(self):
		worker = base.resolveCrossId("data/cores#pc").getUWS()
		jobId = worker.getNewJobId()

		with worker.changeableJob(jobId) as wjob:
			wjob.setParamsFromRawDict(
				{"opre": ["2.5"], "powers": ["1 2 3"]})
			# this would normally happen when queuing, which we don't do here
			# since we want to manually call uwsrun
			wjob.completeParams()
		try:
			self.assertOutput(cli.main, argList=["uwsrun", jobId],
				expectedStdout="", expectedRetcode=0, expectedStderr="")

			job = worker.getJob(jobId)
			fName, _ = job.getResult("result")
			with open(fName, "r") as f:
				res = f.read()
				self.assertTrue("imaginary part" in res)
		finally:
			worker.destroy(jobId)


class UserManagementTest(testhelpers.VerboseTest):
	def testAddtoGroupFailing(self):
		self.assertOutput(cli.main, argList=["admin", "addtogroup",
			"xxyz", "nogroup"],
			expectedRetcode=1,
			expectedStderr="*** Error: User xxyz does not exist.\n")

	def testChangeUserFailing(self):
		self.assertOutput(cli.main, argList=["admin", "changeuser",
			"xxyz", "newpw"],
			expectedRetcode=1,
			expectedStderr="*** Error: User xxyz does not exist.\n")

	def testDelFromGroupAndList(self):
		try:
			self.assertOutput(cli.main, argList=["admin", "adduser",
				"henk", "platt", "soll nicht da sein"],
				expectedStdout="")
			self.assertOutput(cli.main, argList=["adm", "addtogroup",
				"henk", "honk"], expectedStdout="")
			self.assertOutput(cli.main, argList=["adm", "listusers"],
				expectedStdout=[
					"henk (soll nicht da sein) -- henk honk \n"])
			self.assertOutput(cli.main, argList=["adm", "delfromgroup",
				"henk", "honk"], expectedStdout="")
			self.assertOutput(cli.main, argList=["adm", "listusers"],
				expectedStdout=[
					"henk (soll nicht da sein) -- henk \n"])
		finally:
			with base.getWritableAdminConn() as conn:
				conn.execute(
					"delete from dc.users where username='henk'")
				conn.execute(
					"delete from dc.groups where username='henk'")

	def testHashPassword(self):
		self.assertOutput(cli.main, argList=["admin", "hashP",
			"gronk"],
			expectedRetcode=0,
			expectedStderr="",
			expectedStdout=lambda s: re.match(rb"scrypt:[+/A-Za-z0-9]+=*\s*$", s))


if __name__=="__main__":
	testhelpers.main(DropTest)
