# -*- coding: utf-8 -*-
"""
Helper classes for the DaCHS' unit tests.

WARNING: This messes up some global state.  DO NOT import into modules
doing regular work.  testtricks is the module for that kind for stuff.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import http.server
import contextlib
import gc
import io
import os
import pickle
import re
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings

try:
	import coverage
except ImportError:
	# we only need this if someone set COVERAGE_FILE; don't do that
	# if you don't have coverage.py installed.
	pass


# This sets up a test environment of the DaCHS software.
#
# To make this work, the current user must be allowed to run
# createdb (in practice, you should have done something like
#
# sudo -u postgres -s `id -nu`
#
# ). You should be able to tear both ~/_gavo_test and the database 
# down, and this should automatically recreate everything.  That's 
# an integration test for DaCHS, too.
#
# This must be run before anything else from gavo is imported because
# it manipulates the config stuff; this, in turn, runs as soon as
# base is imported.

# The following forces tests to be run from the tests directory.  
# Reasonable, I'd say.
#
# All the custom setup can be suppressed by setting a GAVO_OOTTEST
# env var before importing this.  That's for "out of tree test"
# and is used by the relational registry "unit" tests (and possibly
# others later).

def ensureResources():
	# overridden below if necessary
	pass

if "GAVO_OOTTEST" in os.environ:
	from gavo import base

else:
	TEST_BASE = os.getcwd()
	originalEnvironment = os.environ.copy()
	os.environ["GAVOCUSTOM"] = "/invalid"
	os.environ["GAVOSETTINGS"] = os.path.join(TEST_BASE, 
		"test_data", "test-gavorc")
	if not os.path.exists(os.environ["GAVOSETTINGS"]):
		warnings.warn("testhelpers imported from non-test directory.  This"
			" is almost certainly not what you want (or set GAVO_OOTTEST).")

	from gavo import base #noflake: import above is conditional

	if not os.path.exists(base.getConfig("rootDir")):
		from gavo.user import initdachs
		dbname = "dachstest"
		dsn = initdachs.DSN(dbname)
		initdachs.createFSHierarchy(dsn, "test")

		with open(os.path.join(
				base.getConfig("configDir"), "defaultmeta.txt"), "a", 
				encoding="utf-8") as f:
			f.write("!organization.description: Mein wüster Club\n")
			f.write("!contact.email: invalid@whereever.else\n")

		with open(os.path.join(
				base.getConfig("configDir"), "vanitynames.txt"), "w") as f:
			f.write('http://sofo-hd.de sofo !redirect\n'
				'data/cores/scs/scs.xml fromvanitynames\n')

		from gavo.base import config
		config.makeFallbackMeta(reload=True)
		os.symlink(os.path.join(TEST_BASE, "test_data"),
			os.path.join(base.getConfig("inputsDir"), "data"))
		os.rmdir(os.path.join(base.getConfig("inputsDir"), "__system"))
		os.symlink(os.path.join(TEST_BASE, "test_data", "__system"),
			os.path.join(base.getConfig("inputsDir"), "__system"))
		os.mkdir(os.path.join(base.getConfig("inputsDir"), "test"))

		def ensureResources(): #noflake: deliberate override
			# delay everything that needs DB connectivity until
			# after the import; otherwise, we may create a conn pool
			# and that, as used in DaCHS, may create a thread.  That means
			# risking a deadlock while python is importing.
			try:
				subprocess.check_call(["createdb", "--template=template0", 
					"--encoding=UTF-8", "--locale=C", dbname])

				initdachs.initDB(dsn)

				from gavo.registry import publication
				from gavo import rsc
				from gavo import rscdesc #noflake: caches registration
				from gavo.protocols import creds
				from gavo.protocols import tap

				publication.updateServiceList([base.caches.getRD("//rds")])
				publication.updateServiceList([base.caches.getRD("//services")])
				publication.updateServiceList([base.caches.getRD("//tap")])

				# Import some resources necessary in trial tests
				with base.getWritableAdminConn() as conn:
					rsc.makeData(base.resolveCrossId("//obscore#makeSources"),
						connection=conn)
					rsc.makeData(base.resolveCrossId("//obscore#create"),
						connection=conn)
					tap.publishToTAP(base.resolveCrossId("//obscore"),
						conn)
					rsc.makeData(base.resolveCrossId("//uws#enable_useruws"),
						connection=conn)
					creds.addUser(conn, "testing", "testing", "powerless test user")
					creds.addUser(conn, "other", "other", "another test user")
			except:
				traceback.print_exc()
				sys.stderr.write("Creation of test environment failed.  Remove %s\n"
					" before trying again.\n"%(base.getConfig("rootDir")))
				sys.exit(1)


# we really don't want to send mail from the test suite, so override
# osinter.sendMail with a no-op
from gavo.base import osinter
osinter.sendMail = lambda *args, **kwargs: None

from gavo import utils
from gavo import svcs
from gavo.base import config


class FakeSimbad(object):
	"""we monkeypatch simbadinterface such that we don't query simbad during
	tests.

	Also, we don't persist cached Simbad responses.  It's a bit sad that
	that functionality therefore doesn't get exercised.
	"""
	simbadData = {'Aldebaran': {'RA': 68.98016279,
		'dec': 16.50930235,
		'oname': 'Aldebaran',
		'otype': 'LP?'},
		'M1': {'RA': 83.63308333, 'dec': 22.0145, 'oname': 'M1', 'otype': 'SNR'},
		'Wozzlfoo7xx': None}

	def __init__(self, *args, **kwargs):
		pass
	
	def query(self, ident):
		return self.simbadData.get(ident)


# Here's the deal on TestResource: When setting up complicated stuff for
# tests (like, a DB table), define a TestResource for it.  Override
# the make(dependents) method returning something and the clean(res) 
# method to destroy whatever you created in make().
#
# Then, in VerboseTests, have a class attribute
# resource = [(name1, res1), (name2, res2)]
# giving attribute names and resource *instances*.
# There's an example in adqltest.py
# 
# If you use this and you have a setUp of your own, you *must* call 
# the superclass's setUp method.

import testresources

class ResourceInstance(object):
	"""A helper class for TestResource.

	See that docstring for info on what this is about; in case you
	encounter one of these but need the real thing, just pull
	.original.
	"""
	def __init__(self, original):
		self.original = original
	
	def __getattr__(self, name):
		return getattr(self.original, name)
	
	def __getitem__(self, index):
		return self.original[index]

	def __iter__(self):
		return iter(self.original)

	def __str__(self):
		return str(self.original)

	def __len__(self):
		return len(self.original)

	def __contains__(self, other):
		return other in self.original


class TestResource(testresources.TestResource):
	"""A wrapper for testresources maintaining some backward compatibility.

	testresources 2.0.1 pukes into the namespaces of what's
	returned from make.  I've not really researched what they
	intend people to return from make these days, but in order
	to avoid major surgery on the code, this class simply wraps
	objects that don't allow arbitrary attributes with ResourceInstance
	when returned from make.

	To make that happen, I need to override code from testresources itself,
	which is a pity.  In case this breaks: Take all methods that call .make
	and replace make with _make_and_wrap.

	Caution: when you implement reset(), you'll have to wrap the
	result with testhelpers.ResourceInstance manually; but then you'd
	have to copy dependencies manually, which is crazy, and so I think
	manual reset currently really is broken.
	"""
	def _make_and_wrap(self, deps):
		res = self.make(deps)
		try:
			# see if we can set attributes and return if so...
			res.improbably_named_attribute = None
			del res.improbably_named_attribute
			return res
		except AttributeError:
			# ...else wrap the result
			return ResourceInstance(res)

	##### Methods adapted from testresources

	def _make_all(self, result):
		"""Make the dependencies of this resource and this resource."""
		self._call_result_method_if_exists(result, "startMakeResource", self)
		dependency_resources = {}
		for name, resource in self.resources:
				dependency_resources[name] = resource.getResource()
		resource = self._make_and_wrap(dependency_resources)
		for name, value in list(dependency_resources.items()):
				setattr(resource, name, value)
		self._call_result_method_if_exists(result, "stopMakeResource", self)
		return resource

	def _reset(self, resource, dependency_resources):
		"""Override this to reset resources other than via clean+make.

		This method should reset the self._dirty flag (assuming the manager can
		ever be clean) and return either the old resource cleaned or a fresh
		one.

		:param resource: The resource to reset.
		:param dependency_resources: A dict mapping name -> resource instance
				for the resources specified as dependencies.
		"""
		self.clean(resource)
		return self._make_and_wrap(dependency_resources)


from gavo.helpers.testtricks import (  #noflake: exported names
	XSDTestMixin, testFile, getMemDiffer, getXMLTree, collectedEvents,
	FakeRequest) 


class ForkingSubprocess(subprocess.Popen):
	"""A subprocess that doesn't exec but fork.
	"""
	def _execute_child(self, args, executable, preexec_fn, close_fds,
							 pass_fds, cwd, env,
							 startupinfo, creationflags, shell,
							 p2cread, p2cwrite,
							 c2pread, c2pwrite,
							 errread, errwrite,
							 restore_signals, 
               # post-3.7, some additional parameters were added, which
               # fortunately we don't need.
							 *ignored_args):
# stolen largely from 2.7 subprocess.  Unfortunately, I can't just override the
# exec action; so, I'm replacing the the whole exec shebang with a simple
# call to executable().
#
# Also, I'm doing some extra hack to collect coverage info if it looks
# like we're collecting coverage.
#
# The signature (and a few inside parts) is from 3.7 subprocess.  We're
# ignoring everything that 2.7 hasn't known under the bold assumption that our
# simple testing forks that doesn't matter (which ought to be true for
# start_new_session at least:-).

		sys.argv = args
		if executable is None:
				executable = args[0]

		def _close_in_parent(fd):
				os.close(fd)

		# For transferring possible exec failure from child to parent.
		# Data format: "exception name:hex errno:description"
		# Pickle is not used; it is complex and involves memory allocation.
		errpipe_read, errpipe_write = os.pipe()

		try:
						gc_was_enabled = gc.isenabled()
						# Disable gc to avoid bug where gc -> file_dealloc ->
						# write to stderr -> hang.  http://bugs.python.org/issue1336
						gc.disable()
						try:
								self.pid = os.fork()
						except:
								if gc_was_enabled:
										gc.enable()
								raise
						self._child_created = True
						if self.pid == 0:
								# Child
								try:
										# Close parent's pipe ends
										if p2cwrite!=-1:
												os.close(p2cwrite)
										if c2pread!=-1:
												os.close(c2pread)
										if errread!=-1:
												os.close(errread)
										os.close(errpipe_read)

										# When duping fds, if there arises a situation
										# where one of the fds is either 0, 1 or 2, it
										# is possible that it is overwritten (#12607).
										if c2pwrite == 0:
												c2pwrite = os.dup(c2pwrite)
										if errwrite == 0 or errwrite == 1:
												errwrite = os.dup(errwrite)

										# Dup fds for child
										def _dup2(a, b):
												# dup2() removes the CLOEXEC flag but
												# we must do it ourselves if dup2()
												# would be a no-op (issue #10806).
												if a == b:
														self._set_cloexec_flag(a, False)
												elif a!=-1:
														os.dup2(a, b)
										_dup2(p2cread, 0)
										_dup2(c2pwrite, 1)
										_dup2(errwrite, 2)

										# Close pipe fds.  Make sure we don't close the
										# same fd more than once, or standard fds.
										closed = set([-1])
										for fd in [p2cread, c2pwrite, errwrite]:
												if fd not in closed and fd > 2:
														os.close(fd)
														closed.add(fd)

										if cwd is not None:
												os.chdir(cwd)

										if "COVERAGE_FILE" in os.environ:
											cov = coverage.Coverage(data_file="forked.cov",
												source=["gavo"],
												auto_data=True)
											cov.config.disable_warnings = ["module-not-measured"]
											cov.start()

										if preexec_fn:
												preexec_fn()

										exitcode = 0

								except:
										exc_type, exc_value, tb = sys.exc_info()
										# Save the traceback and attach it to the exception object
										exc_lines = traceback.format_exception(exc_type,
																													 exc_value,
																													 tb)
										exc_value.child_traceback = ''.join(exc_lines)
										os.write(errpipe_write, pickle.dumps(exc_value))
										os.close(errpipe_write)
										os._exit(255)
								
								os.close(errpipe_write)
								try:
									executable()
								except SystemExit as ex:
									exitcode = ex.code

								if "COVERAGE_FILE" in os.environ:
									cov.stop()
									cov.save()
								sys.stderr.close()
								sys.stdout.close()
								os._exit(exitcode)


						# Parent
						os.close(errpipe_write)
						if gc_was_enabled:
								gc.enable()

						# Now wait for the child to come up (at which point it will
						# close its error pipe)
						errpipe_data = bytearray()
						while True:
								part = os.read(errpipe_read, 50000)
								errpipe_data += part
								if not part or len(errpipe_data)>50000:
									break
								if errpipe_data:
									child_exception = pickle.loads(errpipe_data)
									raise child_exception

		finally:
				# close the FDs used by the child if they were opened
				if p2cread!=-1 and p2cwrite!=-1:
						_close_in_parent(p2cread)
				if c2pwrite!=-1 and c2pread!=-1:
						_close_in_parent(c2pwrite)
				if errwrite!=-1 and errread!=-1:
						_close_in_parent(errwrite)

				# be sure the error pipe FD is eventually no matter what
				os.close(errpipe_read)


class VerboseTest(testresources.ResourcedTestCase):
	"""A TestCase with a couple of convenient assert methods.
	"""
	def assertEqualForArgs(self, callable, result, *args):
		self.assertEqual(callable(*args), result, 
			"Failed for arguments %s.  Expected result is: %s, result found"
			" was: %s"%(str(args), repr(result), repr(callable(*args))))

	def assertRaisesVerbose(self, exception, callable, args, msg):
		try:
			callable(*args)
		except exception:
			return
		except:
			raise
		else:
			raise self.failureException(msg)

	def assertRaisesWithMsg(self, exception, errMsg, callable, args, msg=None,
			**kwargs):
		try:
			value = callable(*args, **kwargs)
		except exception as ex:
			if errMsg==str(ex):
				pass # make sure we decide on __eq__ for EqualingRE's sake
			else:
				raise self.failureException(
					"Expected %r, got %r as exception message"%(errMsg, str(ex)))
			return ex
		except:
			raise
		else:
			raise self.failureException(msg or 
				"%s not raised (function returned %s)"%(
					str(exception), repr(value)))

	def assertRuns(self, callable, args, msg=None):
		try:
			callable(*args)
		except Exception as ex:
			raise self.failureException("Should run, but raises %s (%s) exception"%(
				ex.__class__.__name__, str(ex)))

	def assertAlmostEqualVector(self, first, second, places=7, msg=None):
		try:
			for f, s in zip(first, second):
				self.assertAlmostEqual(f, s, places)
		except AssertionError:
			if msg:
				raise AssertionError(msg)
			else:
				raise AssertionError("%s != %s within %d places"%(
					first, second, places))

	def assertEqualToWithin(self, a, b, ratio=1e-7, msg=None):
		"""asserts that abs(a-b/(a+b))<ratio.
		
		If a+b are an underflow, we error out right now.
		"""
		if msg is None:
			msg = "%s != %s to within %s of the sum"%(a, b, ratio)
		denom = abs(a+b)
		self.assertTrue(abs(a-b)/denom<ratio, msg)

	def assertOutput(self, toExec, argList, expectedStdout=None, 
			expectedStderr="", expectedRetcode=0, input=None,
			stdoutStrings=None):
		"""checks that execName called with argList has the given output and return
		value.

		expectedStdout and expectedStderr can be functions.  In that case,
		the output is passed to the function, and an assertionError is raised
		if the functions do not return true.

		The 0th argument in argList is automatically added, only pass "real"
		command line arguments.

		toExec may also be a zero-argument python function.  In that case, the
		process is forked and the function is called, with sys.argv according to
		argList.  This helps to save startup time for python main functions.
		"""
		for name in ["output.stderr", "output.stdout"]:
			try:
				os.unlink(name)
			except os.error:
				pass

		if isinstance(toExec, str):
			p = subprocess.Popen([toExec]+argList, executable=toExec, 
				stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
		else:
			p = ForkingSubprocess(["test harness"]+argList, executable=toExec, 
				stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
		out, err = p.communicate(input=utils.bytify(input))
		retcode = p.wait()

		try:
			
			if isinstance(expectedStderr, (bytes, str)):
				self.assertEqual(err, utils.bytify(expectedStderr))
			elif isinstance(expectedStderr, list):
				for sample in expectedStderr:
					self.assertTrue(utils.bytify(sample) in err,
						"%s missing in stderr"%repr(sample))
			else:
				self.assertTrue(expectedStderr(err), "Stderr didn't match functional"
					" expectation: %s"%err.decode("ascii", "?"))

			self.assertEqual(expectedRetcode, retcode)
		except AssertionError:
			with open("output.stdout", "wb") as f:
				f.write(out)
			with open("output.stderr", "wb") as f:
				f.write(err)
			raise

		try:
			if isinstance(expectedStdout, (bytes, str)):
				self.assertEqual(out, utils.bytify(expectedStdout))
			elif isinstance(expectedStdout, list):
				for sample in expectedStdout:
					self.assertTrue(utils.bytify(sample) in out,
						"%s missing in stdout"%repr(sample))
			elif expectedStdout is not None:
				self.assertTrue(expectedStdout(out))
		except AssertionError:
			with open("output.stdout", "wb") as f:
				f.write(out)
			raise

	def assertEqualIgnoringAliases(self, result, expectation):
		pat = re.escape(expectation).replace("ASWHATEVER", "AS [a-z]+")+"$"
		if not re.match(pat, result):
			raise AssertionError("%r != %r"%(result, expectation))


_xmlJunkPat = re.compile("|".join([
	'(xmlns(:[a-z0-9]+)?="[^"]*"\s*)',
	'((frame_|coord_system_)?id="[^"]*")',
	'(xsi:schemaLocation="[^"]*"\s*)']))


def cleanXML(aString):
	"""removes IDs and some other detritus from XML literals.

	The result will be invalid XML, and all this assumes the fixed-prefix
	logic of the DC software.

	For new tests, you should just getXMLTree and XPath tests.

	aString can be bytes, in which case it will be interpreted as ASCII.
	cleanXML returns a string in any case.
	"""
	return re.sub("\s+", " ", 
		_xmlJunkPat.sub('', 
			utils.debytify(aString))).strip(
				).replace(" />", "/>").replace(" >", ">")


class SamplesBasedAutoTest(type):
	"""A metaclass that builds tests out of a samples attribute of a class.

	To use this, give the class a samples attribute containing a sequence
	of anything, and a _runTest(sample) method receiving one item of
	that sequence.

	The metaclass will create one test<n> method for each sample.
	"""
	def __new__(cls, name, bases, dict):
		for sampInd, sample in enumerate(dict.get("samples", ())):
			def testFun(self, sample=sample):
				self._runTest(sample)
			dict["test%02d"%sampInd] = testFun
		return type.__new__(cls, name, bases, dict)


class SimpleSampleComparisonTest(VerboseTest, metaclass=SamplesBasedAutoTest):
	"""A base class for tests that simply run a function and compare
	for equality.

	The function to be called is in the functionToRun attribute (wrap
	it in a staticmethod).

	The samples are pairs of (input, output).  Output may be an
	exception (or just the serialised form of the exception).
	"""

	def _runTest(self, sample):
		val, expected = sample
		try:
			self.assertEqual(self.functionToRun(val),
				expected)
		except AssertionError:
			raise
		except Exception as ex:
			if str(ex)!=str(expected):
				raise


def computeWCSKeys(pos, size, cutCrap=False):
	"""returns a dictionary containing a 2D WCS structure for an image
	centered at pos with angular size.  Both are 2-tuples in degrees.
	"""
	imgPix = (1000., 1000.)
	res = {
		"CRVAL1": pos[0],
		"CRVAL2": pos[1],
		"CRPIX1": imgPix[0]/2.,
		"CRPIX2": imgPix[1]/2.,
		"CUNIT1": "deg",
		"CUNIT2": "deg",
		"CD1_1": size[0]/imgPix[0],
		"CD1_2": 0,
		"CD2_2": size[1]/imgPix[1],
		"CD2_1": 0,
		"NAXIS1": imgPix[0],
		"NAXIS2": imgPix[1],
		"NAXIS": 2,
		"CTYPE1": 'RA---TAN-SIP', 
		"CTYPE2": 'DEC--TAN-SIP',
		"LONPOLE": 180.}
	if not cutCrap:
		res.update({"imageTitle": "test image at %s"%repr(pos),
			"instId": None,
			"dateObs":55300+pos[0], 
			"refFrame": None,
			"wcs_equinox": None,
			"bandpassId": None,
			"bandpassUnit": None,
			"bandpassRefval": None,
			"bandpassLo": pos[0],
			"bandpassHi": pos[0]+size[0],
			"pixflags": None,
			"accref": "image/%s/%s"%(pos, size),
			"accsize": (30+int(pos[0]+pos[1]+size[0]+size[1]))*1024,
			"embargo": None,
			"owner": None,
		})
	return res


class StandIn(object):
	"""A class having the attributes passed as kwargs to the constructor.
	"""
	def __init__(self, **kwargs):
		for key, value in list(kwargs.items()):
			setattr(self, key, value)


def getTestRD(id="test.rd"):
	from gavo import rscdesc  #noflake: import above is conditional
	from gavo import base
	return base.caches.getRD("data/%s"%id)


def getTestTable(tableName, id="test.rd"):
	return getTestRD(id).getTableDefById(tableName)


def getTestData(dataId):
	return getTestRD().getDataById(dataId)


def captureOutput(callable, args=(), kwargs={}):
	"""runs ``callable(*args, **kwargs)`` and captures the output.

	The function returns a tuple of return value, stdout output, stderr output.
	"""
	realOut, realErr = sys.stdout, sys.stderr
	sys.stdout, sys.stderr = io.StringIO(), io.StringIO()
	try:
		retVal = 2 # in case the callable sys.exits
		try:
			retVal = callable(*args, **kwargs)
		except SystemExit:
			# don't terminate just because someone thinks it's a good idea
			pass
	finally:
		outCont, errCont = sys.stdout.getvalue(), sys.stderr.getvalue()
		sys.stdout, sys.stderr = realOut, realErr
	return retVal, outCont, errCont


class CatchallUI(object):
	"""A replacement for base.ui, collecting the messages being sent.

	This is to write tests against producing UI events.  Use it with
	the messageCollector context manager below.
	"""
	def __init__(self):
		self.events = []

	def record(self, evType, args, kwargs):
		self.events.append((evType, args, kwargs))

	def __getattr__(self, attName):
		if attName.startswith("notify"):
			return lambda *args, **kwargs: self.record(attName[6:], args, kwargs)


@contextlib.contextmanager
def messageCollector():
	"""A context manager recording UI events.

	The object returned by the context manager is a CatchallUI; get the
	events accumulated during the run time in its events attribute.
	"""
	tempui = CatchallUI()
	realui = base.ui
	try:
		base.ui = tempui
		yield tempui
	finally:
		base.ui = realui


@contextlib.contextmanager
def fakeTime(t):
	"""makes time.time() return t within the controlled block.
	"""
	time_time = time.time
	try:
		time.time = lambda *args: t
		yield
	finally:
		time.time = time_time


def getServerInThread(data, onlyOnce=False, contentType="text/plain"):
	"""runs a server in a thread and returns  thread and base url.

	onlyOnce will configure the server such that it destroys itself
	after having handled one request.  The thread would still need
	to be joined.

	So, better use the DataServer context manager.
	"""
	if isinstance(data, str):
		data = data.encode("utf-8")

	class Handler(http.server.BaseHTTPRequestHandler):
		def log_request(self, *args):
			pass
		def do_GET(self):
			self.send_response(200, "Ok")
			self.send_header("Content-Type", contentType)
			self.end_headers()
			self.wfile.write(data)
		do_POST = do_GET
	
	port = 34000
	httpd = http.server.HTTPServer(('', port), Handler)

	if onlyOnce:
		serve = httpd.handle_request
	else:
		serve = httpd.serve_forever

	t = threading.Thread(target=serve)
	t.setDaemon(True)
	t.start()
	return httpd, t, "http://localhost:%s"%port


@contextlib.contextmanager
def DataServer(data):
	"""a context manager for briefly running a web server returning data.

	This yields the base URL the server is listening on.
	"""
	httpd, t, baseURL = getServerInThread(data)

	yield baseURL

	httpd.shutdown()
	t.join(10)


@contextlib.contextmanager
def userconfigContent(content):
	"""a context manager to temporarily set some content to userconfig.

	This cleans up after itself and clears any userconfig cache before
	it sets to work.

	content are RD elements without the root (resource) tag.
	"""
	userConfigPath = os.path.join(
		base.getConfig("configDir"), "userconfig.rd")
	base.caches.clearForName(userConfigPath[:-3])
	with open(userConfigPath, "w") as f:
		f.write('<resource schema="__system">\n'
			+content
			+'\n</resource>\n')
	try:
		yield
	finally:
		os.unlink(userConfigPath)
	base.caches.clearForName(userConfigPath[:-3])


@contextlib.contextmanager
def tempConfig(*items):
	"""sets (sect, key, value) pairs in the config temporarily and re-sets
	the original values after the controlled block.
	"""
	origValues = []
	for sect, key, val in items:
		origValues.append((sect, key, config.get(sect, key)))
		config.set(sect, key, val)
	
	try:
		yield
	finally:
		for sect, key, val in origValues:
			config.set(sect, key, val)


def makeRequestArgs(aDict=None, **kwargs):
	"""returns a request.args compatible dict form aDict.

	In particular, simple values will be put into lists.
	"""
	if aDict is None:
		aDict = {}
	aDict.update(kwargs)
	return dict((k,v if isinstance(v, list) else [v])
		for k, v in aDict.items())


def runSvcWith(service, renderer, args):
	"""runs svc through renderers, passing a dict args.

	args maps to single values, *not* to lists as in t.w.  Unless, of course,
	you actually have lists to begin with.
	"""
	args = makeRequestArgs(args)
	queryMeta = svcs.QueryMeta.fromRequestArgs(args)
	return service.run(renderer, args, queryMeta)


def main(testClass, methodPrefix=None):
	ensureResources()
	
	if os.environ.get("GAVO_LOG")!="no":
		base.DEBUG = True
		from gavo.user import logui
		logui.LoggingUI(base.ui)
	
	if "GAVO_OOTTEST" not in os.environ:
		# run any pending upgrades (that's a test for them, too... of sorts)
		from gavo.user import upgrade
		upgrade.upgrade()

	try:
		# two args: first one is class name, locate it in caller's globals
		# and ignore anything before any dot for cut'n'paste convenience
		if len(sys.argv)>2:
			className = sys.argv[-2].split(".")[-1]
			testClass = getattr(sys.modules["__main__"], className)
		
		# one arg: test method prefix on testClass
		if len(sys.argv)>1:
			suite = unittest.makeSuite(testClass, methodPrefix or sys.argv[-1],
				suiteClass=testresources.OptimisingTestSuite)
		else:  # Zero args, emulate unittest.run behaviour
			suite = testresources.TestLoader().loadTestsFromModule(
				sys.modules["__main__"])

		runner = unittest.TextTestRunner(
			verbosity=int(os.environ.get("TEST_VERBOSITY", 1)))
		result = runner.run(suite)
		if result.errors or result.failures:
			sys.exit(1)
		else:
			sys.exit(0)
	except (SystemExit, KeyboardInterrupt):
		raise
	except:
		base.showHints = True
		from gavo.user import errhandle
		traceback.print_exc()
		errhandle.raiseAndCatch(base)
		sys.exit(1)
