"""
Tests for various parts of the server infrastructure, using trial.
"""

#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import os
import re
import tempfile
import time

from twisted.internet import reactor

from gavo.helpers import trialhelpers

from gavo import api
from gavo import base
from gavo import utils
from gavo.web import ifpages
from gavo.web import root

base.DEBUG = True


class ErrorReportingTest(trialhelpers.ArchiveTest):
	def testBotchedErrorGeneration(self):
		def assertStatus(res):
			self.assertEqual(res[1].code, 500)
			return res
		return self.assertGETHasStrings("test/blowuperror", {}, [
			"NameError: name 'ddt' is not defined",
			'a href="mailto:invalid@wherever.else">'
			],
		).addCallback(assertStatus)
			

class AdminTest(trialhelpers.ArchiveTest):
	def _makeAdmin(self, req):
		req.user = "gavoadmin"
		req.password = api.getConfig("web", "adminpasswd")

	def testDefaultDeny(self):
		return self.assertStatus("/seffe/__system__/adql", 401)
	
	def testFormsRendered(self):
		return self.assertGETHasStrings("/seffe/__system__/adql", {}, [
			"ADQL Query</a>",
			"Schedule downtime for",
			'action="seffe/__system__/adql'],
			rm=self._makeAdmin)
	
	def testDowntimeScheduled(self):
		def checkDowntimeCanceled(ignored):
			self.assertRaises(api.NoMetaKey,
				api.getRD("__system__/adql").getMeta, "_scheduledDowntime",
				raiseOnFail=True)

		def cancelDowntime(ignored):
			return self.assertPOSTHasStrings("/seffe/__system__/adql",
				{"__nevow_form__": "setDowntime"}, [],
				rm=self._makeAdmin)

		def checkDowntime(ignored):
			self.assertEqual(
				str(api.getRD("__system__/adql").getMeta("_scheduledDowntime")),
				'2009-10-13')
			return self.assertGETHasStrings("/__system__/adql/query/availability",
				{}, ["<avl:downAt>2009-10-13<"])

		return trialhelpers.runQuery(self.renderer,
			"POST", "/seffe/__system__/adql",
			{"__nevow_form__": "setDowntime", "scheduled": "2009-10-13"},
			requestMogrifier=self._makeAdmin
		).addCallback(checkDowntime
		).addCallback(cancelDowntime
		).addCallback(checkDowntimeCanceled)

	def testBlockAndReload(self):
		def checkUnBlocked(ignored):
			return self.assertGETHasStrings("/seffe/__system__/adql", {},
				["currently is not blocked"],
				rm=self._makeAdmin)
	
		def reload(ignored):
			return trialhelpers.runQuery(self.renderer,
				"POST", "/seffe/__system__/adql",
				{"__nevow_form__": "adminOps", "submit": "Reload RD"},
				requestMogrifier=self._makeAdmin
			).addCallback(checkUnBlocked)

		def checkBlocked(ignored):
			return self.assertGETHasStrings("/seffe/__system__/adql", {},
				["currently is blocked", "invalid@wherever.else"],
				rm=self._makeAdmin
			).addCallback(reload)

		return trialhelpers.runQuery(self.renderer,
			"POST", "/seffe/__system__/adql",
			{"__nevow_form__": "adminOps", "block": "Block"},
			requestMogrifier=self._makeAdmin
		).addCallback(checkBlocked)


class CustomizationTest(trialhelpers.ArchiveTest):
	def testSidebarRendered(self):
		return self.assertGETHasStrings("/data/test/basicprod/form", {}, [
			'<a href="mailto:invalid@wherever.else">site operators</a>',
			'<div class="exploBody"><span class="plainmeta">ivo://'
				'x-testing/data/test/basicprod</span>'])
	
	def testMacrosExpanded(self):
		return self.assertGETHasStrings("/__system__/dc_tables/list/info", {}, [
			"Information on Service 'Unittest Suite Public Tables'",
			"tables available for ADQL querying within the\nUnittest Suite",
			"Unittest Suite Table Infos</a>",])

	def testExpandedInXML(self):
		return self.assertGETHasStrings("/oai.xml", {
			"verb": "GetRecord",
			"metadataPrefix": "ivo_vor",
			"identifier": "ivo://x-testing/__system__/services/registry"
		}, [
			"<title>Unittest Suite Registry</title>",
			"<managedAuthority>x-testing</managedAuthority>"])


class StaticTest(trialhelpers.ArchiveTest):
	def testSimple(self):
		return self.assertGETHasStrings("/data/cores/rds/static/", {}, [
			'<td><a href="test-gavorc">test-gavorc</a>',
			'<title>Directory listing for data/cores/rds/static/</title>'])
	
	def testRendererInferred(self):
		def assertRedirected(result):
			self.assertEqual(result[1].code, 301)
			self.assertTrue(result[1].getLocationValue(),
				"http://localhost:8080/data/cores/rds/static/")

		return trialhelpers.runQuery(self.renderer, "GET",
			"/data/cores/rds", {}
			).addCallback(assertRedirected)

	def testSlashAdded(self):
		def assertRedirected(result):
			self.assertEqual(result[1].code, 301)
			self.assertTrue(result[1].getLocationValue(),
				"http://localhost:8080/data/cores/rds/static/")

		return trialhelpers.runQuery(self.renderer, "GET",
			"/data/cores/rds/static", {}
			).addCallback(assertRedirected)
	
	def testStaticFile(self):
		return self.assertGETHasStrings("/data/cores/rds/static/ex.fits", {}, [
			"BITPIX  =                   16"])

	def test404(self):
		return self.assertStatus("/data/cores/rds/static/ex.fit", 404)

	def testWithIndexFile(self):
		return self.assertGETHasStrings("/data/test/basicprod/static/", {}, [
			"alpha: 23 34 33.45"])

	def testSubDir(self):
		return self.assertGETHasStrings("/data/cores/rds/static/bin/", {},
			["Directory listing for data/cores/rds/static/bin/"])

	def testAutoMinification(self):
		return self.assertGETHasStrings("/static/js/fancyroot.js", {},
			["ResourceHeader);fetchSubject"])

	def testNoTraversalGlobal(self):
		return self.assertStatus("/static/../src/boosterskel.h", 403)

	def test403WithoutStaticPath(self):
		def assertMessage(res):
			self.assertTrue(b">No static data on this service" in res[0])

		return self.assertStatus("data/cores/coltest/static/bla.dat", 403
			).addCallback(assertMessage)

	def testRedirectForDir(self):
		def assertDestination(res):
			self.assertEqual(res[1].getLocationValue(),
				'http://localhost:8080/data/cores/rds/static/bin/')

		return self.assertStatus("data/cores/rds/static/bin", 301
			).addCallback(assertDestination)

	def testVolatileDoesNotAcceptFixedTemplate(self):
		return self.assertGETHasStrings("data/cores/coltest/volatile", {},
			['class="errmsg">volatile renderer needs a \'volatile\' template'])

	def testMissingIndexFile(self):
		return self.assertStatus("data/cores/cc/static/", 404)


class PathResolutionTest(trialhelpers.ArchiveTest):
	def testExternalVanityRedirect(self):
		def checkDestination(res):
			self.assertEqual(res[1].getLocationValue(), "http://sofo-hd.de")
			return res

		return self.assertStatus("/sofo", 301
			).addCallback(checkDestination)
	
	def testNoRD404(self):
		return self.assertGETHasStrings("/data/nord", {},
			["No matching RD"]).addCallback(
				lambda res: self.assertEqual(res[1].code, 404))

	def testNoSvc404(self):
		return self.assertGETHasStrings("/data/cores/hell", {},
			["No such service"]).addCallback(
				lambda res: self.assertEqual(res[1].code, 404))

	def testDifferentRootPage(self):
		with trialhelpers.tempConfig(
				("web", "root", "__system__/adql/query/form")):
			return self.assertGETHasStrings("", {},
				["<title>Unittest Suite ADQL Query</title>"])


class MaintResponsesTest(trialhelpers.ArchiveTest):
	def testMaintFile(self):
		with open(self.renderer.maintFile, "w", encoding="utf-8") as f:
			f.write("Blöde Welt.\n")

		return self.assertGETHasStrings("/data/cores/rds/static/ex.fits", {}, [
			"mainterror-to-html.xsl", 'value="ERROR">Blöde Welt.\n</INFO>']
			).addCallback(
				lambda res: self.assertEqual(res[1].code, 503)
			).addBoth(
				lambda res: os.unlink(self.renderer.maintFile) or res)
			
	def testRDBlocking(self):
		theRD = base.caches.getRD("data/cores")
		theRD.currently_blocked = True
		return self.assertGETHasStrings("/data/cores/rds/static/ex.fits", {}, [
			"<title>Service temporarily taken down</title>",
			"from a few minutes to a day",
			"invalid@wherever.else"]).addBoth(
				lambda res: delattr(theRD, "currently_blocked") or res)


class StreamingTest(trialhelpers.ArchiveTest):
	def testStreamingWorks(self):
		return self.assertGETHasStrings("/test/stream", {"size": "30"}, [
			"123456789012345678901234567890"])
	
	def testChunksWork(self):
		return self.assertGETHasStrings("/test/stream", {"chunksize": "10",
			"size": "35"}, [
			"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx12345"])
	
	def testCrashStreamDoesNotHang(self):
		return self.assertGETHasStrings("/test/streamcrash", {}, [
			"Here is some data.",
			"XXX Internal",
			"raise Exception"])
	
	def testStopProducing(self):

		class StoppingRequest(trialhelpers.FakeRequest):
			def write(self, data):
				trialhelpers.FakeRequest.write(self, data)
				if self.producer:
					self.producer.stopProducing()

		def assertResult(result):
			# at least one chunk must have been delivered
			self.assertTrue(result[0].startswith(b"xxxxxxxxxx"),
				"No data delivered at all")
			# ...but the kill must become active before the whole mess
			# has been written; there are some ways in which this can
			# fail because of concurreny.  If all else fails, add sync
			# between the test and streaming, but that'd suck.
			self.assertFalse(len(result[0])>utils.StreamBuffer.chunkSize*4,
				"Kill hasn't happened")

		return trialhelpers.runQuery(self.renderer, "GET", "/test/stream",
			{"chunksize": "10", "size": "%s"%(utils.StreamBuffer.chunkSize*5)},
			requestClass=StoppingRequest
		).addCallback(assertResult)

	def testClosingConnection(self):

		class ClosingRequest(trialhelpers.FakeRequest):
			def write(self, data):
				raise IOError("Connection closed")

		def assertResult(result):
			self.assertEqual(result[0], b"")

		return trialhelpers.runQuery(self.renderer, "GET", "/test/stream",
			{"chunksize": "10", "size": "3500"}, requestClass=ClosingRequest
		).addCallback(assertResult)


_TEMPLATE_TEMPLATE = """
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
	"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns:n="http://nevow.com/ns/nevow/0.1"
		xmlns="http://www.w3.org/1999/xhtml">
	<head>
		<title>Template test</title>
		<n:invisible n:render="commonhead"/>
	</head>
	<body>
		%s
	</body>
</html>
"""

class TemplatingTest(trialhelpers.ArchiveTest):

	def _assertTemplateRendersTo(self, templateBody, args, strings,
			render="volatile"):
		hdl, tplpath = tempfile.mkstemp(
			suffix=".html", dir=api.getConfig("tempDir"))

		f = os.fdopen(hdl, "w")
		f.write(_TEMPLATE_TEMPLATE%templateBody)
		f.close()

		svc = api.getRD("//tests").getById("dyntemplate")
		svc._loadedTemplates = {}
		svc.templates[render] = tplpath

		def cleanup(res):
			os.unlink(tplpath)
			return res

		return self.assertGETHasStrings("//tests/dyntemplate/"+render,
			args, strings).addBoth(cleanup)

	def testContentDelivered(self):
		return self._assertTemplateRendersTo(
			'<p>stuff: <n:invisible n:render="string" n:data="parameter foo"/></p>',
			{"foo": "content delivered test"},
			["<p>stuff: content delivered test", 'href="/static/css/gavo_dc.css'])
	
	def testNoParOk(self):
		return self._assertTemplateRendersTo(
			'<p>stuff: <n:invisible n:render="string" n:data="parameter foo"/></p>',
			{},
			["stuff: </p>"])
	
	def testEightBitClean(self):
		return self._assertTemplateRendersTo(
			'<p>stuff: <n:invisible n:render="string" n:data="parameter foo"/></p>',
			{"foo": [b'Kn\xC3\xB6llchen']},
			["stuff: Knöllchen</p>"])

	def testMessEscaped(self):
		return self._assertTemplateRendersTo(
			'<p>stuff: <n:invisible n:render="string" n:data="parameter foo"/></p>',
			{"foo": '<script language="nasty&"/>'},
			['&lt;script language="nasty&amp;"/&gt;'])

	def testRDData(self):
		return self._assertTemplateRendersTo(
			'<p>rd data</p><p n:data="rd //tap">'
			'<n:invisible n:render="string"/></p>',
			{},
			['<p>&lt;resource descriptor for __system__/tap'])

	def testNoRDData(self):
		return self._assertTemplateRendersTo(
			'<div n:data="rd /junky/path/that/leads/nowhere">'
			'<p n:render="ifnodata">No junky weirdness here</p></div>',
			{},
			['<div><p>No junky weirdness'])
	
	def testParamRender(self):
		return self._assertTemplateRendersTo(
			'<div n:data="result">'
			'<p n:render="param a float is equal to %5.2f">aFloat</p>'
			'</div>',
			{"__nevow_form__": "genForm",}, [
				"<p>a float is equal to  1.25</p>"], render="form")

	def testNoParamRender(self):
		return self._assertTemplateRendersTo(
			'<div n:data="result">'
			'<p n:render="param %5.2f">foo</p>'
			'</div>',
			{"__nevow_form__": "genForm",}, [
				"<p>N/A</p>"], render="form")

	def testIfslot(self):
		def assertNoslot(result):
			self.assertTrue(b"dernetsei" not in result[0])
			self.assertTrue(b'class="world"' not in result[0])
			return result

		return self._assertTemplateRendersTo(
			'<ul n:render="sequence" n:data="titleList">'
			'<li n:pattern="item" n:render="mapping">'
			'<p n:render="ifslot gibtsnicht">derfnetsei</p>'
			'<p n:render="ifnoslot gibtsnicht">desscho</p>'
			'<p n:render="ifslot title">'
			'<n:attr name="class"><n:slot name="other"/></n:attr>'
			'Title: <n:slot name="title"/></p>'
			'</li></ul>',
			{},
			["<p>desscho</p>", '<p class="ing">Title: on title</p>',
				"desscho</p></li>"]).addCallback(
				assertNoslot)

	def testFlattenFailureIs500(self):
		def assert500(result):
			self.assertEqual(result[1].code, 500)
		return self._assertTemplateRendersTo(
			'<div><p>This is there</p><p n:render="borken">This is not</p></div>',
			{}, [
				"Exception while flattening",
				"Missing render method",
				": borken"
			]).addCallback(assert500)

	def testDefaultConfigSection(self):
		return self._assertTemplateRendersTo(
			'<p n:render="getconfig">group</p>',
			{},
			['<p>gavo</p>'])

	def testURLEscape(self):
		return self._assertTemplateRendersTo(
			'<a n:data="parameter foo" n:render="string">'
			'<n:attr name="href">http://a.b/c?<n:invisible n:render="urlescape"/>'
			'</n:attr></a>',
			{'foo':'Kröss&http://nourl'},
			['<a href="http://a.b/c?Kr%C3%B6ss%26http%3A//nourl">Kröss&amp;'
			'http://nourl</a>'])

	def testWithFixedQueryCore(self):
		with open(
				os.path.join(base.getConfig("inputsDir"), "fixtest.html"),
				"w") as f:
			f.write("""<html xmlns="http://www.w3.org/1999/xhtml"
				xmlns:n="http://nevow.com/ns/nevow/0.1"><head n:render="commonhead">
				</head><body n:render="withsidebar">
				<div class="result" n:render="ifdata" n:data="result">
				<n:invisible n:render="resulttable"/></div></body></html>""")

		return self.assertGETHasStrings(
			"/data/cores/wfq/fixed", {}, [
				'<div class="result">',
				'title="Num1"',
				'<td>2</td><td>3</td></tr>'])


class PathResoutionTest(trialhelpers.ArchiveTest):
	def testDefaultRenderer(self):
		return self.assertGETHasStrings("/data/cores/impgrouptest", {},
			['id="genForm-rV"']) #form rendered
			
	def testNoDefaultRenderer(self):
		def assertInfoMessage(result):
			self.assertTrue(b"No renderer given and service has no default"
				in result[0])

		self.assertStatus("/data/cores/grouptest", 404
			).addCallback(assertInfoMessage)

	def testSlashAtEndRedirects(self):
		def checkRedirect(result):
			self.assertEqual(result[1].code, 301)
			# the destination of the redirect currently is wrong due to trialhelper
			# restrictions.
		return trialhelpers.runQuery(self.renderer, "GET",
			"/data/cores/convcat/form/", {}
		).addCallback(checkRedirect)
	

def _makeOriginAdder(origin):
	def _(req):
		req.requestHeaders.setRawHeaders("origin", [origin])
	return _


class CORSTest(trialhelpers.ArchiveTest):
	def testAuthorizedCORS(self):
		def assertCORSHeader(res):
			self.assertEqual(
				res[1].responseHeaders.getRawHeaders("access-control-allow-origin"),
			["https://example.com/corsusing/abc/d"])

		return trialhelpers.runQuery(self.renderer, "GET", "/robots.txt", {},
			requestMogrifier=_makeOriginAdder("https://example.com/corsusing/abc/d")
		).addCallback(assertCORSHeader)

	def testUnauthorizedCORS(self):
		def assertNoCORSHeader(res):
			self.assertFalse(res[1].responseHeaders.hasHeader(
				"access-control-allow-origin"))

		return trialhelpers.runQuery(self.renderer, "GET", "/robots.txt", {},
			requestMogrifier=_makeOriginAdder("https://examplexcom/corsusing/abc/d")
		).addCallback(assertNoCORSHeader)


class IfPagesTest(trialhelpers.ArchiveTest):
	def testRobotsTxt(self):
		return self.assertGETHasStrings("/robots.txt", {},
			['Disallow: /login'])
	
	def testRobotsChild(self):
		return self.assertStatus("/robots.txt/foork", 404)

	def testScaledLogo(self):
		return self.assertGETHasStrings("/favicon.png", {},
			["PNG", "IEND"])

	def testCurReaders(self):
		def assertContent(res):
			self.assertTrue(re.match(rb"\d+$", res[0]))

		return self.assertStatus("clientcount", 200
		).addCallback(assertContent)
	
	def testACMEChallengeNoDir(self):
		return self.assertStatus(".well-known/acme-challenge", 403)

	def testNoOddWellKnownChildren(self):
		return self.assertGETHasStrings(".well-known/unknown", {},
			[">b'unknown' not supported in .well-known"])

	def testChallenge(self):
		challengeName = utils.getRandomString(20)
		if not os.path.isdir(ifpages.ACMEChallenge.acmeChallengeDir):
			os.mkdir(ifpages.ACMEChallenge.acmeChallengeDir)

		cpath = os.path.join(
			ifpages.ACMEChallenge.acmeChallengeDir, challengeName)
		def cleanup(res):
			os.unlink(cpath)
			return res

		with open(cpath, "w") as f:
			f.write("Challenge!")
		return self.assertGETHasStrings(".well-known/acme-challenge/"+challengeName,
			{}, ["Challenge!"]).addBoth(cleanup)

	def testMissingChallenge404(self):
		return self.assertGETHasStrings(".well-known/acme-challenge/not-here",
			{}, ["No such challenge known here"])

	def testMissingStatic(self):
		return self.assertGETHasStrings("static/not-here", {},
			[">No matching file, neither built-in"])

	def testTestUserpathOverrides(self):
		destPath = os.path.join(
			ifpages.StaticServer().userPath, "help_vizier.shtml")
		with open(destPath, "w") as f:
			f.write("<html>++Overridden++.</html>\n")

		def cleanup(res):
			os.unlink(destPath)
			return res

		return self.assertGETHasStrings("static/help_vizier.shtml", {},
			["++Overridden++.</html>"]).addBoth(cleanup)

	def testUserpathNoSymlink(self):
		destPath = os.path.join(
			ifpages.StaticServer().userPath, "null")
		os.symlink("/dev/null", destPath)

		def cleanup(res):
			os.unlink(destPath)
			return res

		return self.assertGETHasStrings("static/null", {},
			[">/dev/null is not located "]).addBoth(cleanup)

	def testDebianFallback(self):
		jqDist = os.path.join(
			ifpages.StaticServer().systemPath, "js", "jquery-gavo.js")
		if not os.path.exists(jqDist):
			# we're not running on a checkout in the first place
			return

		os.rename(jqDist, jqDist+".bak")

		def cleanup(res):
			os.rename(jqDist+".bak", jqDist)
			return res

		return self.assertGETHasStrings("static/js/jquery-gavo.js", {},
			['function(global,factory){"use strict";if(typeof module'],
			).addBoth(cleanup)

	def testShtmlProcessng(self):
		return self.assertGETHasStrings("static/help.shtml", {},
			["<title>Unittest Suite Site Help</title>"])

	def testSecurityTxt(self):
		self.assertGETHasStrings(".well-known/security.txt", {}, [
			"PGP SIGNATURE",
			"Canonical: https://dc.g-vo.org/.well-known/security.txt"])


class MetaRenderTest(trialhelpers.ArchiveTest):
	def testMacroExpanded(self):
		return self.assertGETHasStrings("/browse/__system__/tap", {},
			['<div class="rddesc"><span class="plainmeta"> Unittest'
				" Suite's Table Access"])

	def testTableinfoSortOrder(self):
		def checkOrder(res):
			indices = [res[0].find(s) for s in [
				b"<td>description</td>",
				b"<td>schema_name</td>",
				b"<td>sourceRD</td>",
				b"<td>table_index</td>",
				b"<td>table_name</td>"]]
			sorted_indices = list(sorted(indices))
			self.assertEqual(sorted_indices, indices)
			return res

		return self.assertGETHasStrings("/tableinfo/tap_schema.tables",
			{"alphaOrder": ["True"]},
			["<p>Sorted alphabetically. <a href="]
			).addCallback(checkOrder)

	def testTableinfoNotable(self):
		return self.assertGETHasStrings("/tableinfo", {},
			["You must provide a table name to this renderer."])

	def testTableinfoDirlike(self):
		return self.assertGETHasStrings("/tableinfo/", {}, [
			'<div class="errmsg">table \'\' could not be located in'
			' DaCHS\' table listing.</div>'])
	
	def testWithTapinfo(self):
		return self.assertGETHasStrings("/tableinfo/tap_schema.tables",
			{"tapinfo": ["t"]}, [
				"available through this site's TAP service",
				"url: <strong>http://localhost:8080/tap"])

	def testRelationships(self):
		return self.assertGETHasStrings("/data/cores/pc/info", {}, [
			"is derived from</span>", # relationship names translated
			'href="http://localhost:8080/tableinfo/test.abcd"', # ref URL fetch
			'">Some Service', # titles rendered
			'<a href="http://just.any.old.url">DataCite be damned', #_related support
			'<a href="https://dc.g-vo.org/LP/ivo://org', # Landing page URLs
			])

	def testRelatedMacroExpanded(self):
		return self.assertGETHasStrings("/data/cores/pc/info", {}, [
			"http://localhost:8080/foo/bar/baz"])


class MetaPagesTest(trialhelpers.ArchiveTest):
	def testGetRR404(self):
		return self.assertGETHasStrings("/getRR/non/existing", {},
			['The resource non#existing is unknown at this site.'])

	def testEmptyGetRR404(self):
		return self.assertGETHasStrings("/getRR", {},
			['What resource record do you want?'])


	def testGetRRForService(self):
		return self.assertGETHasStrings("/getRR/data/pubtest/moribund", {},
			['<identifier>ivo://x-testing/data/pubtest/moribund</identifier>'])


class FormRenderTest(trialhelpers.ArchiveTest):
	def testTableWidget(self):
		return self.assertGETHasStrings("/data/test/basicprod/form",
			{}, [
				'<select name="_DBOPTIONS_DIR" id="genForm-_DBOPTIONS_DIR">'
					'<option value="ASC">ASC</option>',
				'<select name="MAXREC" id="genForm-MAXREC">'
					'<option value="100" selected="selected">100<',
				'id="genForm-_FORMAT"',
				'Sort by <select name="_DBOPTIONS_ORDER" id="genForm-_DBOPTIONS_ORDER">'
					'<option value="" selected="selected">',])

	def testDefaultsRendered(self):
		return self.assertGETHasStrings("/data/cores/pc/form",
			{},
			['value="1.0"', 'value="1 2 3"'])

	def testValuesPassedRendered(self):
		return self.assertGETHasStrings("/data/cores/pc/form",
			{"opre": "3"},
			['value="1.0"', 'value="1 2 3"', 'value="3.0"'])
	
	def testDefaultForFormRendered(self):
		return self.assertGETHasStrings("/data/cores/impgrouptest/form",
			{},
			['value="-30"'])

	def testOverflowIndication(self):
		return self.assertGETHasStrings("/data/test/basicprod/form",
			{"__nevow_form__": "genForm", "MAXREC": "1"}, [
			"The query limit was reached."])


class ExternalRenderTest(trialhelpers.ArchiveTest):
	def testBasic(self):
		return self.assertStatus("data/cores/singlearg/external", 301
			).addCallback(lambda res:
				self.assertEqual(
					res[1].getLocationValue(),
					"http://supersmart.edu/singlearg"))

	def testNoExternal(self):
		return self.assertStatus("data/cores/coltest/external", 404
			).addCallback(lambda res:
				self.assertTrue(
					b"No publication for an external service here.</div>" in res[0],
					"Unexpected: %s"%res[0]))


class CoverageTest(trialhelpers.ArchiveTest):
	def test404WithoutCoverage(self):
		def assertHasMessage(res):
			self.assertTrue(
				b"No spatial coverage available for this service." in res[0],
				"Improper error message when no coverage info is available.")

		return self.assertStatus("/data/test/basicprod/coverage", 404
			).addCallback(assertHasMessage)
	
	def testFITSReturned(self):
		return self.assertGETHasStrings("/data/cores/scs/coverage", {}, [
			"SIMPLE  =",
			"XTENSION= 'BINTABLE'",
			"NAXIS   =                    2",
			"\0\0\0\x1a"])


class TestExamples(trialhelpers.ArchiveTest):
	def testBasic(self):
		def assertNoContinuation(res):
			self.assertTrue(b"continuations" not in res[0])
			return res

		return self.assertGETHasStrings("/data/cores/dl/examples",
			{}, [
			'<title>Examples for Hollow Datalink</title',
			'<h2 property="name">Example 1',
			'property="dl-id"',
			'ivo://org.gavo.dc/~?bla/foo/qua</em>',
			'resource="#Example2"',
			'<p>This is another example for examples.</p>']).addCallback(
				assertNoContinuation)

	def testPublicationSelection(self):
		def checkRenderers(result):
			tree = trialhelpers.getXMLTree(result[0])
			rends = set(e.get("standardID").split("#")[-1]
				for e in tree.xpath("//capability[@standardID]"))
			self.assertEqual(rends, set(
				['availability', 'capabilities', 'examples',
					'links-1.1', 'tables']))

		return trialhelpers.runQuery(self.renderer, "GET",
			"/getRR/data/cores/dl", {}
		).addCallback(checkRenderers)

	def testNoExamples(self):
		def assert404(res):
			self.assertEqual(res[1].code, 404)
			return res

		return self.assertGETHasStrings("/__system__/adql/query/examples", {}, [
			"This service does not define examples of its own."]
			).addCallback(assert404)

	def testInternalContinuation(self):
		return self.assertGETHasStrings("/data/cores/basiccat/examples", {}, [
			'href="http://localhost:8080/data/cores/convcat/examples">More from conv</a>',
			'href="http://localhost:8080/data/ssatest/s/examples">More<',
			'href="http://ivoa.net/doc/obscore/tap-examples.xhtml">More external</',
			'property="continuation"',
			])


class QPTest(trialhelpers.ArchiveTest):
	def testBasic(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/abc", {}, [
			'<span class="plainmeta">ivo://x-testing/data/cores/singlearg',
			'title="I\'m not value"',
			'<td>3</td><td>abcabcabc</td>',
			'</html>'])

	def testAdditionalParam(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/x",
			{"otherarg": ["4"]}, [
			'<td>4</td><td>xxxx</td>'])

	def testVOTable(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/x",
			{"_FORMAT": ["VOTable"]}, [
			'<VOTABLE', "AAAAAQAAA"])

	def testEmptyMatch(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/x",
			{"otherarg": ["0"]}, [
			'<div class="errmsg">No record matching x.</div>'])

	def testRaisingCore(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/fail hard",
			{}, [
			"The query initiated by your URL failed, yielding a message"
			" 'name 'ddt' is not defined'."]
			).addCallback(lambda res: self.assertEqual(res[1].code, 404))

	def testMissingArg(self):
		return self.assertGETHasStrings("data/cores/singlearg/qp/",
			{}, [
			'<div class="errmsg">This page is a root page for a query-based'
			' service.  You have to give a valid value in the path.</div>']
			).addCallback(lambda res: self.assertEqual(res[1].code, 404))


class VOSITest(trialhelpers.ArchiveTest):
	def testBrokenCapDoesNotKillCapResponse(self):
		def checkNoSIAP(res):
			self.assertTrue(b"std/SIA" not in res[0])
			return res
		return self.assertGETHasStrings("data/uwstest/borken/capabilities", {}, [
			'<interface xsi:type="vr:WebBrowser">',
			'</cap:capabilities>']).addCallback(checkNoSIAP)

	def testBadAvailability(self):
		return self.assertGETHasStrings("data/cores/uploadtest/availability", {}, [
			"<h1>Unittest Suite -- Error (500)</h1>",
			'class="errmsg">Invalid constructor func in (\'crash_this\''])

def _nukeHostPart(uri):
	return "/"+uri.split("/", 3)[-1]


class TestUserUWS(trialhelpers.ArchiveTest):
	def testWorkingLifecycle(self):
		def assertDeleted(result, jobURL):
			self.assertEqual(result[1].code, 303)
			next = _nukeHostPart(result[1].getLocationValue())
			jobId = next.split("/")[-1]
			return self.assertGETLacksStrings(next, {}, ['jobref id="%s"'%jobId]
			).addCallback(lambda res: reactor.disconnectAll())

		def delete(jobURL):
			return trialhelpers.runQuery(self.renderer, "DELETE", jobURL, {}
			).addCallback(assertDeleted, jobURL)

		def checkOtherResult(result, jobURL):
			self.assertEqual(
				result[1].responseHeaders.getRawHeaders("content-type"),
				["text/plain"])
			self.assertEqual(result[0], b"Hello World.\n")
			return delete(jobURL)

		def checkResult(result, jobURL):
			self.assertTrue(b"<TR><TD>1.0</TD><TD>3.0</TD><TD>1.151292" in result[0])
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/results/aux.txt", {}
				).addCallback(checkOtherResult, jobURL)

		def checkFinished(result, jobURL):
			self.assertTrue(b"phase>COMPLETED" in result[0])
			self.assertTrue(b'xlink:href="http://' in result[0])
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/results/result", {}
				).addCallback(checkResult, jobURL)

		def waitForResult(result, jobURL, ct=0):
			if ct>20:
				raise AssertionError("user UWS job doesn't COMPLETE or ERROR")
			time.sleep(0.5)
			if b"phase>COMPLETED" in result[0] or b"phase>ERROR" in result[0]:
				if b"phase>ERROR" in result[0]:
					raise AssertionError("UWS user test job failed with %s"%result[0])
				return checkFinished(result, jobURL)
			else:
				return trialhelpers.runQuery(self.renderer, "GET", jobURL, {}
				).addCallback(waitForResult, jobURL, ct+1)

		def checkParametersImmutable(result, jobURL):
			self.assertStringsIn(result, ['<INFO name="QUERY_STATUS" value="ERROR">',
				'Field phase: Parameters cannot be changed in phase'])
			self.assertEqual(result[1].code, 200)
			return trialhelpers.runQuery(self.renderer, "GET", jobURL, {}
			).addCallback(waitForResult, jobURL)

		def checkPhaseIsExecuting(result, jobURL):
			self.assertEqual(result[0], b"EXECUTING")
			return trialhelpers.runQuery(self.renderer, "POST",
				jobURL+"/parameters", {"opim": ["4"]}
			).addCallback(checkParametersImmutable, jobURL)

		def assertStarted(lastRes, jobURL):
			req = lastRes[1]
			self.assertEqual(req.code, 303)
			self.assertTrue(
				req.getLocationValue().endswith(jobURL))
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/phase", {}
				).addCallback(checkPhaseIsExecuting, jobURL)

		def checkPosted(result):
			request = result[1]
			self.assertEqual(request.code, 303)
			jobURL = _nukeHostPart(
				request.getLocationValue())
			self.assertTrue(jobURL.startswith("/data/cores/pc/async/"))
			return trialhelpers.runQuery(self.renderer, "POST",
				jobURL+"/phase", {"PHASE": "RUN"}
			).addCallback(assertStarted, jobURL)

		# See the same thing in test_tap.  What can I do?
		return trialhelpers.runQuery(self.renderer, "POST",
			"/data/cores/pc/uws.xml", {
				"opre": ["1"], "opim": ["3"], "powers": ["1 2 3"],
				"responseformat": "application/x-votable+xml;serialization=TABLEDATA",
			}
		).addCallback(checkPosted)
	
	def testParameterSetting(self):
		def deleteJob(jobURL):
			return trialhelpers.runQuery(self.renderer, "DELETE", jobURL, {}
			).addCallback(lambda res: reactor.disconnectAll())

		def assertParams(result, jobURL):
			self.assertTrue(
				b'<uws:parameter id="opim">3.0</uws:parameter>' in result[0], "opim")
			self.assertTrue(
				b'<uws:parameter id="powers">1 2 3</uws:parameter>' in result[0],
				"powers")
			self.assertTrue(
				b'<uws:parameter id="responseformat">application/x-votable+xml'
				b'</uws:parameter>' in result[0],
				"responseformat")
			self.assertTrue(
				b'<uws:parameter id="opre">1.0</uws:parameter>' in result[0],
				"opre")
			self.assertTrue(re.search(b'<uws:parameter byReference="True" id="stuff">'
				b'http://localhost:8080/data/cores/pc/async/[^/]*/results/stuff',
				result[0]), "stuff from upload")

			self.assertTrue(b'<uws:quote xsi:nil="true"></uws:quote>'
				in result[0])

			return deleteJob(jobURL)

		def checkParameters(result):
			jobURL = _nukeHostPart(
				result[1].getLocationValue())
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL, {}
			).addCallback(assertParams, jobURL)

		return trialhelpers.runQuery(self.renderer, "POST",
			"/data/cores/pc/uws.xml", {
				"opre": ["1"], "opim": ["3"], "powers": ["1 2 3"],
				"upload": ["stuff,param:foo"],
				"foo": trialhelpers.FakeFile(
					"wronsky", b"abc, die Katze lief im Schnee.\n"),
			}
		).addCallback(checkParameters)

	def testMultiUpload(self):
		def assertOverwritten(result, jobURL):
			self.assertEqual(result[1].code, 200)
			self.assertEqual(b"overwritten", result[0])
			return trialhelpers.runQuery(self.renderer, "DELETE", jobURL, {}
				).addCallback(lambda res: reactor.disconnectAll())

		def retrieveOverwritten(result, jobURL):
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/results/stuff", {}
			).addCallback(assertOverwritten, jobURL)

		def assertFilePresent(result, jobURL):
			self.assertEqual(b"abc, die Katze lief im Schnee.\n",
				result[0])
			return trialhelpers.runQuery(self.renderer, "POST",
				jobURL+"/parameters", {"UPLOAD": ["stuff,param:overwrite"],
					"overwrite": trialhelpers.FakeFile("bye", b"overwritten")}
			).addCallback(retrieveOverwritten, jobURL)

		def assertParams(result, jobURL):
			self.assertTrue(re.search(b'<uws:parameter byReference="True"'
				b' id="stuff">http://localhost:8080/data/cores/uc/async/[^/]*/'
				b'results/stuff</uws:parameter>', result[0]), "stuff")
			self.assertTrue(re.search(b'<uws:parameter byReference="True"'
				b' id="other">http://localhost:8080/data/cores/uc/async/[^/]*/'
				b'results/other</uws:parameter>', result[0]), "other")
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/results/stuff", {}
			).addCallback(assertFilePresent, jobURL)

		def getJobURL(result):
			jobURL = _nukeHostPart(
				result[1].getLocationValue())
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL, {}
			).addCallback(assertParams, jobURL)

		return trialhelpers.runQuery(self.renderer, "POST",
			"/data/cores/uc/uws.xml", {
				"UPLOAD": ["stuff,param:foo", "other,param:bar"],
				"foo": trialhelpers.FakeFile("foo", b"abc, die Katze lief im Schnee.\n"),
				"bar": trialhelpers.FakeFile("bar", b"Other stuff"),
			}
		).addCallback(getJobURL)


def _setUser(username, password=None):
	def _(request):
		request.user = utils.bytify(username)
		request.password = password or username
	return _


class TestUWSAuth(trialhelpers.ArchiveTest):
	def testAuthBig(self):
		def cleanupNext(result, jobURL2):
			return trialhelpers.runQuery(self.renderer, "DELETE",
				jobURL2, {})

		def assertUnauthenticatedSeesAll(result, jobURL, jobURL2):
			self.assertTrue(utils.bytify(jobURL) in result[0])
			self.assertTrue(utils.bytify(jobURL2) in result[0])
			return trialhelpers.runQuery(self.renderer, "DELETE",
				jobURL, {}
			).addCallback(cleanupNext, jobURL2)

		def assertOnlyOwnedJobsVisible(result, jobURL, jobURL2):
			self.assertTrue(utils.bytify(jobURL) in result[0])
			self.assertFalse(utils.bytify(jobURL2) in result[0],
				"public job in private job list")
			return trialhelpers.runQuery(self.renderer, "GET",
				"/".join(jobURL.split("/")[:-1]), {}
			).addCallback(assertUnauthenticatedSeesAll, jobURL, jobURL2)

		def queryJobList(result, jobURL):
			jobURL2 = _nukeHostPart(
				result[1].getLocationValue())
			return trialhelpers.runQuery(self.renderer, "GET",
				"/".join(jobURL.split("/")[:-1]), {},
				requestMogrifier=_setUser("testing")
			).addCallback(assertOnlyOwnedJobsVisible, jobURL, jobURL2)

		def assertNoAccessWithBadCreds(result, jobURL):
			self.assertEqual(result[1].code, 401)

			# now create an anonymous job so you can see whether it's visible
			return trialhelpers.runQuery(self.renderer, "POST",
				"/data/cores/pc/uws.xml", {
					"opre": ["5"], "opim": ["7"], "powers": ["8", "9", "10"]}
				).addCallback(queryJobList, jobURL)

		def assertNoGeneralAccess(result, jobURL):
			self.assertEqual(result[1].code, 401)
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL, {}, requestMogrifier=_setUser("testing", b"notright")
			).addCallback(assertNoAccessWithBadCreds, jobURL)

		def assertAuthenticatedAccess(result, jobURL):
			self.assertTrue(b"<uws:ownerId>testing</uws:ownerId>" in result[0])
			self.assertTrue(b"1 2 3</uws:parame" in result[0])
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL, {},
			).addCallback(assertNoGeneralAccess, jobURL)

		def assertOwnerSet(result, jobURL):
			self.assertEqual(result[0], b"testing")
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL, {}, requestMogrifier=_setUser("testing")
			).addCallback(assertAuthenticatedAccess, jobURL)

		def assertPosted(result):
			request = result[1]
			jobURL = _nukeHostPart(request.getLocationValue())
			return trialhelpers.runQuery(self.renderer, "GET",
				jobURL+"/owner", {}, requestMogrifier=_setUser("testing")
			).addCallback(assertOwnerSet, jobURL)

		return trialhelpers.runQuery(self.renderer, "POST",
			"/data/cores/pc/uws.xml", {
				"opre": ["1"], "opim": ["3"], "powers": ["-20", "1 2 3"]},
			requestMogrifier=_setUser("testing")
		).addCallback(assertPosted)


def _nukeServicePart(url):
# remove service from UWS URL so we can test for visibility of services
# in different user uwses below
	return '/'.join(url.split('/')[-2:])

class UserUWSJoblistTest(trialhelpers.ArchiveTest):

	# visisbility of UWS jobs created with auth: pcJob is private,
	# ucJob is public.
	def testJoblist(self):
		def assertPCJoblist(result, pcJobURL, ucJobURL):
			self.assertStringsIn(result, [_nukeServicePart(pcJobURL)])
			self.assertStringsIn(result, [_nukeServicePart(ucJobURL)], inverse=True)
			trialhelpers.runQuery(self.renderer, "DELETE",
				pcJobURL, {},
				requestMogrifier=_setUser("testing"))
			return trialhelpers.runQuery(self.renderer, "DELETE",
				ucJobURL, {})

		def assertUCJoblist(result, pcJobURL, ucJobURL):
			self.assertStringsIn(result, [_nukeServicePart(ucJobURL)])
			self.assertStringsIn(result, [_nukeServicePart(pcJobURL)], inverse=True)
			return trialhelpers.runQuery(self.renderer, "GET",
				"/data/cores/pc/uws.xml", {}
			).addCallback(assertPCJoblist, pcJobURL, ucJobURL)

		def getJoblist(result, pcJobURL):
			ucJobURL = _nukeHostPart(result[1].getLocationValue())
			return trialhelpers.runQuery(self.renderer, "GET",
				"/data/cores/uc/uws.xml", {},
			).addCallback(assertUCJoblist, pcJobURL, ucJobURL)

		def postOther(result):
			pcJobURL = _nukeHostPart(result[1].getLocationValue())
			return trialhelpers.runQuery(self.renderer, "POST",
				"/data/cores/uc/uws.xml", {
					"UPLOAD": ["stuff,param:foo", "other,param:bar"],
					"foo": trialhelpers.FakeFile(
						"knöterich", b"abc, die Katze lief im Schnee.\n"),
					"bar": trialhelpers.FakeFile("/schnalle/", b"Other stuff"),
				},
			).addCallback(getJoblist, pcJobURL)

		return trialhelpers.runQuery(self.renderer, "POST",
			"/data/cores/pc/uws.xml", {
				"opre": ["1"], "opim": ["3"], "powers": ["1", "2", "3"]},
			requestMogrifier=_setUser("testing")
		).addCallback(postOther)


def _requestCompression(request):
	request.requestHeaders.addRawHeader("Accept-Encoding", "gzip")


class LogInOutTest(trialhelpers.ArchiveTest):
	def testLoginRequest(self):
		def assertStatus(res):
			self.assertEqual(res[1].code, 401)

		return self.assertGETHasStrings("/login", {}, [
			"<p>Please enter your credentials (by reloading this page)"]
		).addCallback(assertStatus)

	def testLoginSuccessful(self):
		def assertNextURL(res):
			self.assertTrue(b'<a href="http://localhost:8080/rompp">' in res[0])
			self.assertEqual(
				res[1].getLocationValue(),
				"http://localhost:8080/rompp")
			return res

		return self.assertStatus("/login", 303, {"nextURL": ["/rompp"]},
			rm=trialhelpers.addCreds
		).addCallback(assertNextURL)

	def testLogoutAuth(self):
		def assertStatus(res):
			self.assertEqual(res[1].code, 401)

		return self.assertGETHasStrings("/login", {"nextURL": ["/rompp"],
				"relog": ["True"]}, [
			"<title>Unittest Suite -- Authentication Required</title>"],
			rm=trialhelpers.addCreds,
		).addCallback(assertStatus)

	def testLogoutNonauth(self):
		def assertNextURL(res):
			self.assertEqual(
				res[1].getLocationValue(),
				"http://localhost:8080/rompp")
			return res

		return self.assertStatus("/login", 303, {"nextURL": ["/rompp"],
			"relog": "True"},
		).addCallback(assertNextURL)

	def testLogoutNoNextURL(self):
		return self.assertGETHasStrings("/login", {"relog": ["True"]}, [
			"<p>The resource you are trying to access is protected, and "],
			rm=trialhelpers.addCreds)


class AuthcheckTest(trialhelpers.ArchiveTest):
	def _assert401(self, res):
		self.assertEqual(res[1].code, 401)
		return res

	def _assert200(self, res):
		self.assertEqual(res[1].code, 200)
		return res

	def _assertHasChallenge(self, res):
		ch = res[1].responseHeaders.getRawHeaders("www-authenticate")[0]
		self.assertEqual(ch, 'Basic realm="dachsunit"')
		return res

	def _assertHasNoChallenge(self, res):
		self.assertEqual(
			res[1].responseHeaders.getRawHeaders("www-authenticate"),
			None)
		return res

	def _assertAuthUser(self, user):
		def _(res):
			u = res[1].responseHeaders.getRawHeaders("x-vo-authenticated")[0]
			self.assertEqual(u, user)
			return res
		return _

	def _assertNoAuthUser(self, res):
		self.assertEqual(
			res[1].responseHeaders.getRawHeaders("x-vo-authenticated"),
			None)
		return res

	def testLimitToOk(self):
		return self.assertGETHasStrings("/data/cores/keepout/capabilities", {},
			["<accessURL"],
			rm=trialhelpers.addCreds
			).addCallback(self._assert200
			).addCallback(self._assertHasNoChallenge
			).addCallback(self._assertAuthUser("testing"))

	def testLimitToBadPass(self):
		return self.assertGETHasStrings("/data/cores/keepout/capabilities", {},
			["access is protected"],
			rm=lambda req:trialhelpers.addCreds(req, password="wrong")
			).addCallback(self._assert401
			).addCallback(self._assertHasChallenge
			).addCallback(self._assertNoAuthUser)

	def testLimitToBadUser(self):
		return self.assertGETHasStrings("/data/cores/keepout/capabilities", {},
			["access is protected"],
			rm=lambda req:trialhelpers.addCreds(req, user="other", password="other")
			).addCallback(self._assert401
			).addCallback(self._assertHasChallenge
			).addCallback(self._assertNoAuthUser)

	def testNocredClosed(self):
		return self.assertGETHasStrings("/data/cores/keepout/capabilities", {},
			["access is protected"],
			).addCallback(self._assert401
			).addCallback(self._assertHasChallenge
			).addCallback(self._assertNoAuthUser)

	def testOpenBadPass(self):
		return self.assertGETHasStrings("/data/cores/coltest/capabilities", {},
			["<accessURL"],
			rm=lambda req:trialhelpers.addCreds(req, password="wrong")
			).addCallback(self._assert401
			).addCallback(self._assertHasChallenge
			).addCallback(self._assertNoAuthUser)

	def testOpenGoodPass(self):
		return self.assertGETHasStrings("/data/cores/coltest/capabilities", {},
			["<accessURL"],
			rm=lambda req:trialhelpers.addCreds(req, user="other", password="other")
			).addCallback(self._assert200
			).addCallback(self._assertHasNoChallenge
			).addCallback(self._assertAuthUser("other"))

	def testOnTAP(self):
		return self.assertGETHasStrings("/tap/capabilities", {},
			["<accessURL"],
			rm=lambda req:trialhelpers.addCreds(req, user="other", password="other")
			).addCallback(self._assert200
			).addCallback(self._assertHasNoChallenge
			).addCallback(self._assertAuthUser("other"))

	def testNocredOpen(self):
		return self.assertGETHasStrings("/data/cores/coltest/capabilities", {},
			["<accessURL"],
			).addCallback(self._assert200
			).addCallback(self._assertHasChallenge
			).addCallback(self._assertNoAuthUser)

	def testHeaderAdded(self):
		return self.assertGETHasStrings("/data/cores/keepout/form", {},
			['<form id="genForm" action="data/cores/keepout/form"'],
			rm=trialhelpers.addCreds).addCallback(
				self._assertAuthUser("testing"))


@contextlib.contextmanager
def listeningToHTTPS(listening):
	lastval = base.LISTENING_TO_HTTPS
	try:
		base.LISTENING_TO_HTTPS = listening
		yield
	finally:
		base.LISTENING_TO_HTTPS = lastval


class UpgradeInsecureRequestsTest(trialhelpers.ArchiveTest):
	@staticmethod
	def _addUIR(request):
		request.requestHeaders.setRawHeaders(
			"Upgrade-Insecure-Requests", ["1"])

	def testNoUpgradesWithoutHTTPS(self):
		with listeningToHTTPS(False):
			return self.assertGETHasStrings("/", {},
				["<h2>Services available here</h2>"],
				rm=self._addUIR)

	def testBasicUpgrade(self):
		def assertRedirection(res):
			self.assertEqual(res[1].getLocationValue(),
				"https://localhost/__system__/adql/query/form")
			self.assertEqual(res[1].code, 307)

		with listeningToHTTPS(True):
			return self.assertGETHasStrings("/__system__/adql/query/form", {},
				["redirected to an https version\r\n"],
				rm=self._addUIR
			).addCallback(assertRedirection)

	def testNoUpgradeOnPOST(self):
		with listeningToHTTPS(True):
			return self.assertPOSTHasStrings("/__system__/adql/query/form",
				{"__nevow_form__": "genForm"},
				["ADQL query : </strong>Required</li>"],
				rm=self._addUIR)

	def testNoUpgradeWhenSecure(self):
		def makeSecure(req):
			self._addUIR(req)
			req.secure = True

		with listeningToHTTPS(True):
			return self.assertGETHasStrings("/", {},
				["<h2>Services available here</h2>"],
				rm=makeSecure)


class CachingTest(trialhelpers.ArchiveTest):
	def assertWarmComesFromCache(self, res):
		creationStamp = utils.debytify(
			res[1].responseHeaders.getRawHeaders("x-cache-creation")[0])
		self.assertTrue(abs(float(creationStamp)-time.time())<1)
		return res

	def assertColdNotFromCache(self, res):
		self.assertFalse(res[1].responseHeaders.hasHeader("x-cache-creation"))
		return self.assertGETHasStrings("data/seeded/hooray/info",
			{}, [
				"<title>Information on Service 'Injection testing'</title>"]
		).addCallback(self.assertWarmComesFromCache)

	def testBasicCaching(self):
		base.caches.clearForName("data/seeded")
		return self.assertGETHasStrings("data/seeded/hooray/info",
			{}, [
				"<title>Information on Service 'Injection testing'</title>"]
		).addCallback(self.assertColdNotFromCache)


	def assertWithUserNotFromCache(self, res, again):
		self.assertFalse(res[1].responseHeaders.hasHeader("x-cache-creation"))
		if again:
			return self.assertGETHasStrings("data/seeded/hooray/info",
				{}, [
					"<title>Information on Service 'Injection testing'</title>"],
				rm=trialhelpers.addCreds
			).addCallback(self.assertWithUserNotFromCache, 0)
		return res

	def testNoCachingWithUser(self):
		return self.assertGETHasStrings("data/seeded/hooray/info",
			{}, [
				"<title>Information on Service 'Injection testing'</title>"],
			rm=trialhelpers.addCreds
		).addCallback(self.assertWithUserNotFromCache, 1)

	
	def assertOverfullCacheRejects(self, res):
		pc = base.caches.getPageCache("data/seeded")
		self.assertEqual(pc.cacheDict, {})
		pc.maxSize = 1000000
		return res

	def testCacheLimit(self):
		base.caches.clearForName("data/seeded")
		_ = base.caches.getRD("data/seeded")
		base.caches.getPageCache("data/seeded").maxSize = 10

		return self.assertGETHasStrings("data/seeded/hooray/info",
			{}, [
				"<title>Information on Service 'Injection testing'</title>"]
		).addCallback(self.assertOverfullCacheRejects)


class ThirdPartyCacheTest(trialhelpers.ArchiveTest):
	def setUp(self):
		prevCache = os.path.join(base.getConfig("cacheDir"), "3rdparty", "testing")
		if os.path.exists(prevCache):
			os.unlink(prevCache)

	def testRemoteFailure(self):
		d = trialhelpers.DataServer("Something.\n")
		url = d.__enter__()
		ifpages.ThirdPartyCachePage.urlMapping[b"testing"] = utils.bytify(
			url+"/does-not-exist")
		return self.assertGETHasStrings("3rdparty/testing", {}, [
			'<div class="errmsg">404 Not Found</div>']).addBoth(
			lambda res: d.__exit__(None, None, None) or res)

	def testRemoteSuccess(self):
		d = trialhelpers.DataServer("Something.\n")
		url = d.__enter__()
		ifpages.ThirdPartyCachePage.urlMapping[b"testing"] = utils.bytify(url)
		return self.assertGETHasStrings("3rdparty/testing", {}, [
			"Something.\n"]).addBoth(
			lambda res: d.__exit__(None, None, None) or res)

	def test404(self):
		return self.assertStatus("3rdparty/does_not_exist", 404)


_SUPERSEDED_RD_LITERAL = """<resource schema="data">
<meta name="superseded" format="rst">
	This stuff is stuffy.  Go here_ instead.

	.. _here: http://blog.g-vo.org
</meta>
<table id="dust"><column name="in_the_wind"/></table></resource>"""


class SupersededTest(trialhelpers.ArchiveTest):
	def testRDSuperseded(self):
		def assert404(res):
			self.assertEqual(res[1].code, 404)
			return res

		tmprd = trialhelpers.testFile("withsup.rd", _SUPERSEDED_RD_LITERAL,
			inDir=api.getConfig("inputsDir"))
		tmprd.__enter__()

		return self.assertGETHasStrings("withsup/s/wop", {}, [
			'class="superseded-message"',
			"<p>This stuff is stuffy",
			'href="http://blog.g-vo.org"']
		).addCallback(assert404
		).addBoth(lambda res: tmprd.__exit__(None, None, None) or res)
	

	def testServiceSuperseded(self):
		def assert404(res):
			self.assertEqual(res[1].code, 404)
			return res

		return self.assertGETHasStrings("data/cores/zombie/scs.xml", {}, [
			'href="https://en.wikipedia.org/wiki/George_A._Romero"',
			"<p>Brains!  Brains! <a "]
		).addCallback(assert404)


class SchemaTest(trialhelpers.ArchiveTest):
	def test404(self):
		return self.assertGETHasStrings("schemata/../../", {}, [
			"div class=\"errmsg\">'..' is neither a prefix nor a"
			" schema name known here</div>"])

	def testByPrefix(self):
		def assertRedirect(result):
			self.assertEqual(result[1].getLocationValue(),
				"http://vo.ari.uni-heidelberg.de/docs/schemata/VOResource.xsd")
			return result

		return self.assertGETHasStrings("schemata/vr", {}, [
			"This is supposed to redirect to http://vo.ari.uni-heidelberg.de"
			"/docs/schemata/VOResource.xsd"]
			).addCallback(assertRedirect)

	def testBySchemaName(self):
		def assertRedirect(result):
			self.assertEqual(result[1].getLocationValue(),
				"http://vo.ari.uni-heidelberg.de/docs/schemata/VOResource.xsd")
			return result

		return self.assertGETHasStrings("schemata/VOResource.xsd", {}, [
			"This is supposed to redirect to http://vo.ari.uni-heidelberg.de"
			"/docs/schemata/VOResource.xsd"]
			).addCallback(assertRedirect)
	
	def testSchemaIndex(self):
		return self.assertGETHasStrings("schemata", {}, [
			'<a href="http://vo.ari.uni-heidelberg.de/docs/schemata/OAI-PMH'
			'.xsd">oai</a>'])


class BlockingTest(trialhelpers.ArchiveTest):
	def testNoMessageBlocking(self):
		root.BLOCKED_HOSTS.add("this.is.no.ip.address")
		def cleanup(res):
			root.BLOCKED_HOSTS.pop()
			return res

		return self.assertGETHasStrings("", {}, [
			"Sorry, you're blocked.\nThere should"]).addBoth(cleanup)

	def testWithMessageBlocking(self):
		root.BLOCKED_HOSTS.add("this.is.no.ip.address")
		msgFile = api.getConfig("stateDir") / "block-message.txt"
		with open(msgFile, "w") as f:
			f.write("Call us for good advice: ++49 6221 54 1837\n")

		def cleanup(res):
			root.BLOCKED_HOSTS.pop()
			os.unlink(msgFile)
			return res

		return self.assertGETHasStrings("", {}, [
			"all us for good advice: ++49 6221 54 1837\n"]).addBoth(cleanup)
