File: testhelpers.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (928 lines) | stat: -rw-r--r-- 27,863 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
# -*- coding: utf-8 -*-
"""
Helper classes for the DaCHS' unit tests.

WARNING: This messes up some global state.  DO NOT import into modules
doing regular work.  testtricks is the module for that kind for stuff.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import http.server
import contextlib
import gc
import io
import os
import pickle
import re
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings

try:
	import coverage
except ImportError:
	# we only need this if someone set COVERAGE_FILE; don't do that
	# if you don't have coverage.py installed.
	pass


# This sets up a test environment of the DaCHS software.
#
# To make this work, the current user must be allowed to run
# createdb (in practice, you should have done something like
#
# sudo -u postgres -s `id -nu`
#
# ). You should be able to tear both ~/_gavo_test and the database 
# down, and this should automatically recreate everything.  That's 
# an integration test for DaCHS, too.
#
# This must be run before anything else from gavo is imported because
# it manipulates the config stuff; this, in turn, runs as soon as
# base is imported.

# The following forces tests to be run from the tests directory.  
# Reasonable, I'd say.
#
# All the custom setup can be suppressed by setting a GAVO_OOTTEST
# env var before importing this.  That's for "out of tree test"
# and is used by the relational registry "unit" tests (and possibly
# others later).

def ensureResources():
	# overridden below if necessary
	pass

if "GAVO_OOTTEST" in os.environ:
	from gavo import base

else:
	TEST_BASE = os.getcwd()
	originalEnvironment = os.environ.copy()
	os.environ["GAVOCUSTOM"] = "/invalid"
	os.environ["GAVOSETTINGS"] = os.path.join(TEST_BASE, 
		"test_data", "test-gavorc")
	if not os.path.exists(os.environ["GAVOSETTINGS"]):
		warnings.warn("testhelpers imported from non-test directory.  This"
			" is almost certainly not what you want (or set GAVO_OOTTEST).")

	from gavo import base #noflake: import above is conditional

	if not os.path.exists(base.getConfig("rootDir")):
		from gavo.user import initdachs
		dbname = "dachstest"
		dsn = initdachs.DSN(dbname)
		initdachs.createFSHierarchy(dsn, "test")

		with open(os.path.join(
				base.getConfig("configDir"), "defaultmeta.txt"), "a", 
				encoding="utf-8") as f:
			f.write("!organization.description: Mein wüster Club\n")
			f.write("!contact.email: invalid@whereever.else\n")

		with open(os.path.join(
				base.getConfig("configDir"), "vanitynames.txt"), "w") as f:
			f.write('http://sofo-hd.de sofo !redirect\n'
				'data/cores/scs/scs.xml fromvanitynames\n')

		from gavo.base import config
		config.makeFallbackMeta(reload=True)
		os.symlink(os.path.join(TEST_BASE, "test_data"),
			os.path.join(base.getConfig("inputsDir"), "data"))
		os.rmdir(os.path.join(base.getConfig("inputsDir"), "__system"))
		os.symlink(os.path.join(TEST_BASE, "test_data", "__system"),
			os.path.join(base.getConfig("inputsDir"), "__system"))
		os.mkdir(os.path.join(base.getConfig("inputsDir"), "test"))

		def ensureResources(): #noflake: deliberate override
			# delay everything that needs DB connectivity until
			# after the import; otherwise, we may create a conn pool
			# and that, as used in DaCHS, may create a thread.  That means
			# risking a deadlock while python is importing.
			try:
				subprocess.check_call(["createdb", "--template=template0", 
					"--encoding=UTF-8", "--locale=C", dbname])

				initdachs.initDB(dsn)

				from gavo.registry import publication
				from gavo import rsc
				from gavo import rscdesc #noflake: caches registration
				from gavo.protocols import creds
				from gavo.protocols import tap

				publication.updateServiceList([base.caches.getRD("//rds")])
				publication.updateServiceList([base.caches.getRD("//services")])
				publication.updateServiceList([base.caches.getRD("//tap")])

				# Import some resources necessary in trial tests
				with base.getWritableAdminConn() as conn:
					rsc.makeData(base.resolveCrossId("//obscore#makeSources"),
						connection=conn)
					rsc.makeData(base.resolveCrossId("//obscore#create"),
						connection=conn)
					tap.publishToTAP(base.resolveCrossId("//obscore"),
						conn)
					rsc.makeData(base.resolveCrossId("//uws#enable_useruws"),
						connection=conn)
					creds.addUser(conn, "testing", "testing", "powerless test user")
					creds.addUser(conn, "other", "other", "another test user")
			except:
				traceback.print_exc()
				sys.stderr.write("Creation of test environment failed.  Remove %s\n"
					" before trying again.\n"%(base.getConfig("rootDir")))
				sys.exit(1)


# we really don't want to send mail from the test suite, so override
# osinter.sendMail with a no-op
from gavo.base import osinter
osinter.sendMail = lambda *args, **kwargs: None

from gavo import utils
from gavo import svcs
from gavo.base import config


class FakeSimbad(object):
	"""we monkeypatch simbadinterface such that we don't query simbad during
	tests.

	Also, we don't persist cached Simbad responses.  It's a bit sad that
	that functionality therefore doesn't get exercised.
	"""
	simbadData = {'Aldebaran': {'RA': 68.98016279,
		'dec': 16.50930235,
		'oname': 'Aldebaran',
		'otype': 'LP?'},
		'M1': {'RA': 83.63308333, 'dec': 22.0145, 'oname': 'M1', 'otype': 'SNR'},
		'Wozzlfoo7xx': None}

	def __init__(self, *args, **kwargs):
		pass
	
	def query(self, ident):
		return self.simbadData.get(ident)


# Here's the deal on TestResource: When setting up complicated stuff for
# tests (like, a DB table), define a TestResource for it.  Override
# the make(dependents) method returning something and the clean(res) 
# method to destroy whatever you created in make().
#
# Then, in VerboseTests, have a class attribute
# resource = [(name1, res1), (name2, res2)]
# giving attribute names and resource *instances*.
# There's an example in adqltest.py
# 
# If you use this and you have a setUp of your own, you *must* call 
# the superclass's setUp method.

import testresources

class ResourceInstance(object):
	"""A helper class for TestResource.

	See that docstring for info on what this is about; in case you
	encounter one of these but need the real thing, just pull
	.original.
	"""
	def __init__(self, original):
		self.original = original
	
	def __getattr__(self, name):
		return getattr(self.original, name)
	
	def __getitem__(self, index):
		return self.original[index]

	def __iter__(self):
		return iter(self.original)

	def __str__(self):
		return str(self.original)

	def __len__(self):
		return len(self.original)

	def __contains__(self, other):
		return other in self.original


class TestResource(testresources.TestResource):
	"""A wrapper for testresources maintaining some backward compatibility.

	testresources 2.0.1 pukes into the namespaces of what's
	returned from make.  I've not really researched what they
	intend people to return from make these days, but in order
	to avoid major surgery on the code, this class simply wraps
	objects that don't allow arbitrary attributes with ResourceInstance
	when returned from make.

	To make that happen, I need to override code from testresources itself,
	which is a pity.  In case this breaks: Take all methods that call .make
	and replace make with _make_and_wrap.

	Caution: when you implement reset(), you'll have to wrap the
	result with testhelpers.ResourceInstance manually; but then you'd
	have to copy dependencies manually, which is crazy, and so I think
	manual reset currently really is broken.
	"""
	def _make_and_wrap(self, deps):
		res = self.make(deps)
		try:
			# see if we can set attributes and return if so...
			res.improbably_named_attribute = None
			del res.improbably_named_attribute
			return res
		except AttributeError:
			# ...else wrap the result
			return ResourceInstance(res)

	##### Methods adapted from testresources

	def _make_all(self, result):
		"""Make the dependencies of this resource and this resource."""
		self._call_result_method_if_exists(result, "startMakeResource", self)
		dependency_resources = {}
		for name, resource in self.resources:
				dependency_resources[name] = resource.getResource()
		resource = self._make_and_wrap(dependency_resources)
		for name, value in list(dependency_resources.items()):
				setattr(resource, name, value)
		self._call_result_method_if_exists(result, "stopMakeResource", self)
		return resource

	def _reset(self, resource, dependency_resources):
		"""Override this to reset resources other than via clean+make.

		This method should reset the self._dirty flag (assuming the manager can
		ever be clean) and return either the old resource cleaned or a fresh
		one.

		:param resource: The resource to reset.
		:param dependency_resources: A dict mapping name -> resource instance
				for the resources specified as dependencies.
		"""
		self.clean(resource)
		return self._make_and_wrap(dependency_resources)


from gavo.helpers.testtricks import (  #noflake: exported names
	XSDTestMixin, testFile, getMemDiffer, getXMLTree, collectedEvents,
	FakeRequest) 


class ForkingSubprocess(subprocess.Popen):
	"""A subprocess that doesn't exec but fork.
	"""
	def _execute_child(self, args, executable, preexec_fn, close_fds,
							 pass_fds, cwd, env,
							 startupinfo, creationflags, shell,
							 p2cread, p2cwrite,
							 c2pread, c2pwrite,
							 errread, errwrite,
							 restore_signals, 
               # post-3.7, some additional parameters were added, which
               # fortunately we don't need.
							 *ignored_args):
# stolen largely from 2.7 subprocess.  Unfortunately, I can't just override the
# exec action; so, I'm replacing the the whole exec shebang with a simple
# call to executable().
#
# Also, I'm doing some extra hack to collect coverage info if it looks
# like we're collecting coverage.
#
# The signature (and a few inside parts) is from 3.7 subprocess.  We're
# ignoring everything that 2.7 hasn't known under the bold assumption that our
# simple testing forks that doesn't matter (which ought to be true for
# start_new_session at least:-).

		sys.argv = args
		if executable is None:
				executable = args[0]

		def _close_in_parent(fd):
				os.close(fd)

		# For transferring possible exec failure from child to parent.
		# Data format: "exception name:hex errno:description"
		# Pickle is not used; it is complex and involves memory allocation.
		errpipe_read, errpipe_write = os.pipe()

		try:
						gc_was_enabled = gc.isenabled()
						# Disable gc to avoid bug where gc -> file_dealloc ->
						# write to stderr -> hang.  http://bugs.python.org/issue1336
						gc.disable()
						try:
								self.pid = os.fork()
						except:
								if gc_was_enabled:
										gc.enable()
								raise
						self._child_created = True
						if self.pid == 0:
								# Child
								try:
										# Close parent's pipe ends
										if p2cwrite!=-1:
												os.close(p2cwrite)
										if c2pread!=-1:
												os.close(c2pread)
										if errread!=-1:
												os.close(errread)
										os.close(errpipe_read)

										# When duping fds, if there arises a situation
										# where one of the fds is either 0, 1 or 2, it
										# is possible that it is overwritten (#12607).
										if c2pwrite == 0:
												c2pwrite = os.dup(c2pwrite)
										if errwrite == 0 or errwrite == 1:
												errwrite = os.dup(errwrite)

										# Dup fds for child
										def _dup2(a, b):
												# dup2() removes the CLOEXEC flag but
												# we must do it ourselves if dup2()
												# would be a no-op (issue #10806).
												if a == b:
														self._set_cloexec_flag(a, False)
												elif a!=-1:
														os.dup2(a, b)
										_dup2(p2cread, 0)
										_dup2(c2pwrite, 1)
										_dup2(errwrite, 2)

										# Close pipe fds.  Make sure we don't close the
										# same fd more than once, or standard fds.
										closed = set([-1])
										for fd in [p2cread, c2pwrite, errwrite]:
												if fd not in closed and fd > 2:
														os.close(fd)
														closed.add(fd)

										if cwd is not None:
												os.chdir(cwd)

										if "COVERAGE_FILE" in os.environ:
											cov = coverage.Coverage(data_file="forked.cov",
												source=["gavo"],
												auto_data=True)
											cov.config.disable_warnings = ["module-not-measured"]
											cov.start()

										if preexec_fn:
												preexec_fn()

										exitcode = 0

								except:
										exc_type, exc_value, tb = sys.exc_info()
										# Save the traceback and attach it to the exception object
										exc_lines = traceback.format_exception(exc_type,
																													 exc_value,
																													 tb)
										exc_value.child_traceback = ''.join(exc_lines)
										os.write(errpipe_write, pickle.dumps(exc_value))
										os.close(errpipe_write)
										os._exit(255)
								
								os.close(errpipe_write)
								try:
									executable()
								except SystemExit as ex:
									exitcode = ex.code

								if "COVERAGE_FILE" in os.environ:
									cov.stop()
									cov.save()
								sys.stderr.close()
								sys.stdout.close()
								os._exit(exitcode)


						# Parent
						os.close(errpipe_write)
						if gc_was_enabled:
								gc.enable()

						# Now wait for the child to come up (at which point it will
						# close its error pipe)
						errpipe_data = bytearray()
						while True:
								part = os.read(errpipe_read, 50000)
								errpipe_data += part
								if not part or len(errpipe_data)>50000:
									break
								if errpipe_data:
									child_exception = pickle.loads(errpipe_data)
									raise child_exception

		finally:
				# close the FDs used by the child if they were opened
				if p2cread!=-1 and p2cwrite!=-1:
						_close_in_parent(p2cread)
				if c2pwrite!=-1 and c2pread!=-1:
						_close_in_parent(c2pwrite)
				if errwrite!=-1 and errread!=-1:
						_close_in_parent(errwrite)

				# be sure the error pipe FD is eventually no matter what
				os.close(errpipe_read)


class VerboseTest(testresources.ResourcedTestCase):
	"""A TestCase with a couple of convenient assert methods.
	"""
	def assertEqualForArgs(self, callable, result, *args):
		self.assertEqual(callable(*args), result, 
			"Failed for arguments %s.  Expected result is: %s, result found"
			" was: %s"%(str(args), repr(result), repr(callable(*args))))

	def assertRaisesVerbose(self, exception, callable, args, msg):
		try:
			callable(*args)
		except exception:
			return
		except:
			raise
		else:
			raise self.failureException(msg)

	def assertRaisesWithMsg(self, exception, errMsg, callable, args, msg=None,
			**kwargs):
		try:
			value = callable(*args, **kwargs)
		except exception as ex:
			if errMsg==str(ex):
				pass # make sure we decide on __eq__ for EqualingRE's sake
			else:
				raise self.failureException(
					"Expected %r, got %r as exception message"%(errMsg, str(ex)))
			return ex
		except:
			raise
		else:
			raise self.failureException(msg or 
				"%s not raised (function returned %s)"%(
					str(exception), repr(value)))

	def assertRuns(self, callable, args, msg=None):
		try:
			callable(*args)
		except Exception as ex:
			raise self.failureException("Should run, but raises %s (%s) exception"%(
				ex.__class__.__name__, str(ex)))

	def assertAlmostEqualVector(self, first, second, places=7, msg=None):
		try:
			for f, s in zip(first, second):
				self.assertAlmostEqual(f, s, places)
		except AssertionError:
			if msg:
				raise AssertionError(msg)
			else:
				raise AssertionError("%s != %s within %d places"%(
					first, second, places))

	def assertEqualToWithin(self, a, b, ratio=1e-7, msg=None):
		"""asserts that abs(a-b/(a+b))<ratio.
		
		If a+b are an underflow, we error out right now.
		"""
		if msg is None:
			msg = "%s != %s to within %s of the sum"%(a, b, ratio)
		denom = abs(a+b)
		self.assertTrue(abs(a-b)/denom<ratio, msg)

	def assertOutput(self, toExec, argList, expectedStdout=None, 
			expectedStderr="", expectedRetcode=0, input=None,
			stdoutStrings=None):
		"""checks that execName called with argList has the given output and return
		value.

		expectedStdout and expectedStderr can be functions.  In that case,
		the output is passed to the function, and an assertionError is raised
		if the functions do not return true.

		The 0th argument in argList is automatically added, only pass "real"
		command line arguments.

		toExec may also be a zero-argument python function.  In that case, the
		process is forked and the function is called, with sys.argv according to
		argList.  This helps to save startup time for python main functions.
		"""
		for name in ["output.stderr", "output.stdout"]:
			try:
				os.unlink(name)
			except os.error:
				pass

		if isinstance(toExec, str):
			p = subprocess.Popen([toExec]+argList, executable=toExec, 
				stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
		else:
			p = ForkingSubprocess(["test harness"]+argList, executable=toExec, 
				stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
		out, err = p.communicate(input=utils.bytify(input))
		retcode = p.wait()

		try:
			
			if isinstance(expectedStderr, (bytes, str)):
				self.assertEqual(err, utils.bytify(expectedStderr))
			elif isinstance(expectedStderr, list):
				for sample in expectedStderr:
					self.assertTrue(utils.bytify(sample) in err,
						"%s missing in stderr"%repr(sample))
			else:
				self.assertTrue(expectedStderr(err), "Stderr didn't match functional"
					" expectation: %s"%err.decode("ascii", "?"))

			self.assertEqual(expectedRetcode, retcode)
		except AssertionError:
			with open("output.stdout", "wb") as f:
				f.write(out)
			with open("output.stderr", "wb") as f:
				f.write(err)
			raise

		try:
			if isinstance(expectedStdout, (bytes, str)):
				self.assertEqual(out, utils.bytify(expectedStdout))
			elif isinstance(expectedStdout, list):
				for sample in expectedStdout:
					self.assertTrue(utils.bytify(sample) in out,
						"%s missing in stdout"%repr(sample))
			elif expectedStdout is not None:
				self.assertTrue(expectedStdout(out))
		except AssertionError:
			with open("output.stdout", "wb") as f:
				f.write(out)
			raise

	def assertEqualIgnoringAliases(self, result, expectation):
		pat = re.escape(expectation).replace("ASWHATEVER", "AS [a-z]+")+"$"
		if not re.match(pat, result):
			raise AssertionError("%r != %r"%(result, expectation))


_xmlJunkPat = re.compile("|".join([
	'(xmlns(:[a-z0-9]+)?="[^"]*"\s*)',
	'((frame_|coord_system_)?id="[^"]*")',
	'(xsi:schemaLocation="[^"]*"\s*)']))


def cleanXML(aString):
	"""removes IDs and some other detritus from XML literals.

	The result will be invalid XML, and all this assumes the fixed-prefix
	logic of the DC software.

	For new tests, you should just getXMLTree and XPath tests.

	aString can be bytes, in which case it will be interpreted as ASCII.
	cleanXML returns a string in any case.
	"""
	return re.sub("\s+", " ", 
		_xmlJunkPat.sub('', 
			utils.debytify(aString))).strip(
				).replace(" />", "/>").replace(" >", ">")


class SamplesBasedAutoTest(type):
	"""A metaclass that builds tests out of a samples attribute of a class.

	To use this, give the class a samples attribute containing a sequence
	of anything, and a _runTest(sample) method receiving one item of
	that sequence.

	The metaclass will create one test<n> method for each sample.
	"""
	def __new__(cls, name, bases, dict):
		for sampInd, sample in enumerate(dict.get("samples", ())):
			def testFun(self, sample=sample):
				self._runTest(sample)
			dict["test%02d"%sampInd] = testFun
		return type.__new__(cls, name, bases, dict)


class SimpleSampleComparisonTest(VerboseTest, metaclass=SamplesBasedAutoTest):
	"""A base class for tests that simply run a function and compare
	for equality.

	The function to be called is in the functionToRun attribute (wrap
	it in a staticmethod).

	The samples are pairs of (input, output).  Output may be an
	exception (or just the serialised form of the exception).
	"""

	def _runTest(self, sample):
		val, expected = sample
		try:
			self.assertEqual(self.functionToRun(val),
				expected)
		except AssertionError:
			raise
		except Exception as ex:
			if str(ex)!=str(expected):
				raise


def computeWCSKeys(pos, size, cutCrap=False):
	"""returns a dictionary containing a 2D WCS structure for an image
	centered at pos with angular size.  Both are 2-tuples in degrees.
	"""
	imgPix = (1000., 1000.)
	res = {
		"CRVAL1": pos[0],
		"CRVAL2": pos[1],
		"CRPIX1": imgPix[0]/2.,
		"CRPIX2": imgPix[1]/2.,
		"CUNIT1": "deg",
		"CUNIT2": "deg",
		"CD1_1": size[0]/imgPix[0],
		"CD1_2": 0,
		"CD2_2": size[1]/imgPix[1],
		"CD2_1": 0,
		"NAXIS1": imgPix[0],
		"NAXIS2": imgPix[1],
		"NAXIS": 2,
		"CTYPE1": 'RA---TAN-SIP', 
		"CTYPE2": 'DEC--TAN-SIP',
		"LONPOLE": 180.}
	if not cutCrap:
		res.update({"imageTitle": "test image at %s"%repr(pos),
			"instId": None,
			"dateObs":55300+pos[0], 
			"refFrame": None,
			"wcs_equinox": None,
			"bandpassId": None,
			"bandpassUnit": None,
			"bandpassRefval": None,
			"bandpassLo": pos[0],
			"bandpassHi": pos[0]+size[0],
			"pixflags": None,
			"accref": "image/%s/%s"%(pos, size),
			"accsize": (30+int(pos[0]+pos[1]+size[0]+size[1]))*1024,
			"embargo": None,
			"owner": None,
		})
	return res


class StandIn(object):
	"""A class having the attributes passed as kwargs to the constructor.
	"""
	def __init__(self, **kwargs):
		for key, value in list(kwargs.items()):
			setattr(self, key, value)


def getTestRD(id="test.rd"):
	from gavo import rscdesc  #noflake: import above is conditional
	from gavo import base
	return base.caches.getRD("data/%s"%id)


def getTestTable(tableName, id="test.rd"):
	return getTestRD(id).getTableDefById(tableName)


def getTestData(dataId):
	return getTestRD().getDataById(dataId)


def captureOutput(callable, args=(), kwargs={}):
	"""runs ``callable(*args, **kwargs)`` and captures the output.

	The function returns a tuple of return value, stdout output, stderr output.
	"""
	realOut, realErr = sys.stdout, sys.stderr
	sys.stdout, sys.stderr = io.StringIO(), io.StringIO()
	try:
		retVal = 2 # in case the callable sys.exits
		try:
			retVal = callable(*args, **kwargs)
		except SystemExit:
			# don't terminate just because someone thinks it's a good idea
			pass
	finally:
		outCont, errCont = sys.stdout.getvalue(), sys.stderr.getvalue()
		sys.stdout, sys.stderr = realOut, realErr
	return retVal, outCont, errCont


class CatchallUI(object):
	"""A replacement for base.ui, collecting the messages being sent.

	This is to write tests against producing UI events.  Use it with
	the messageCollector context manager below.
	"""
	def __init__(self):
		self.events = []

	def record(self, evType, args, kwargs):
		self.events.append((evType, args, kwargs))

	def __getattr__(self, attName):
		if attName.startswith("notify"):
			return lambda *args, **kwargs: self.record(attName[6:], args, kwargs)


@contextlib.contextmanager
def messageCollector():
	"""A context manager recording UI events.

	The object returned by the context manager is a CatchallUI; get the
	events accumulated during the run time in its events attribute.
	"""
	tempui = CatchallUI()
	realui = base.ui
	try:
		base.ui = tempui
		yield tempui
	finally:
		base.ui = realui


@contextlib.contextmanager
def fakeTime(t):
	"""makes time.time() return t within the controlled block.
	"""
	time_time = time.time
	try:
		time.time = lambda *args: t
		yield
	finally:
		time.time = time_time


def getServerInThread(data, onlyOnce=False, contentType="text/plain"):
	"""runs a server in a thread and returns  thread and base url.

	onlyOnce will configure the server such that it destroys itself
	after having handled one request.  The thread would still need
	to be joined.

	So, better use the DataServer context manager.
	"""
	if isinstance(data, str):
		data = data.encode("utf-8")

	class Handler(http.server.BaseHTTPRequestHandler):
		def log_request(self, *args):
			pass
		def do_GET(self):
			self.send_response(200, "Ok")
			self.send_header("Content-Type", contentType)
			self.end_headers()
			self.wfile.write(data)
		do_POST = do_GET
	
	port = 34000
	httpd = http.server.HTTPServer(('', port), Handler)

	if onlyOnce:
		serve = httpd.handle_request
	else:
		serve = httpd.serve_forever

	t = threading.Thread(target=serve)
	t.setDaemon(True)
	t.start()
	return httpd, t, "http://localhost:%s"%port


@contextlib.contextmanager
def DataServer(data):
	"""a context manager for briefly running a web server returning data.

	This yields the base URL the server is listening on.
	"""
	httpd, t, baseURL = getServerInThread(data)

	yield baseURL

	httpd.shutdown()
	t.join(10)


@contextlib.contextmanager
def userconfigContent(content):
	"""a context manager to temporarily set some content to userconfig.

	This cleans up after itself and clears any userconfig cache before
	it sets to work.

	content are RD elements without the root (resource) tag.
	"""
	userConfigPath = os.path.join(
		base.getConfig("configDir"), "userconfig.rd")
	base.caches.clearForName(userConfigPath[:-3])
	with open(userConfigPath, "w") as f:
		f.write('<resource schema="__system">\n'
			+content
			+'\n</resource>\n')
	try:
		yield
	finally:
		os.unlink(userConfigPath)
	base.caches.clearForName(userConfigPath[:-3])


@contextlib.contextmanager
def tempConfig(*items):
	"""sets (sect, key, value) pairs in the config temporarily and re-sets
	the original values after the controlled block.
	"""
	origValues = []
	for sect, key, val in items:
		origValues.append((sect, key, config.get(sect, key)))
		config.set(sect, key, val)
	
	try:
		yield
	finally:
		for sect, key, val in origValues:
			config.set(sect, key, val)


def makeRequestArgs(aDict=None, **kwargs):
	"""returns a request.args compatible dict form aDict.

	In particular, simple values will be put into lists.
	"""
	if aDict is None:
		aDict = {}
	aDict.update(kwargs)
	return dict((k,v if isinstance(v, list) else [v])
		for k, v in aDict.items())


def runSvcWith(service, renderer, args):
	"""runs svc through renderers, passing a dict args.

	args maps to single values, *not* to lists as in t.w.  Unless, of course,
	you actually have lists to begin with.
	"""
	args = makeRequestArgs(args)
	queryMeta = svcs.QueryMeta.fromRequestArgs(args)
	return service.run(renderer, args, queryMeta)


def main(testClass, methodPrefix=None):
	ensureResources()
	
	if os.environ.get("GAVO_LOG")!="no":
		base.DEBUG = True
		from gavo.user import logui
		logui.LoggingUI(base.ui)
	
	if "GAVO_OOTTEST" not in os.environ:
		# run any pending upgrades (that's a test for them, too... of sorts)
		from gavo.user import upgrade
		upgrade.upgrade()

	try:
		# two args: first one is class name, locate it in caller's globals
		# and ignore anything before any dot for cut'n'paste convenience
		if len(sys.argv)>2:
			className = sys.argv[-2].split(".")[-1]
			testClass = getattr(sys.modules["__main__"], className)
		
		# one arg: test method prefix on testClass
		if len(sys.argv)>1:
			suite = unittest.makeSuite(testClass, methodPrefix or sys.argv[-1],
				suiteClass=testresources.OptimisingTestSuite)
		else:  # Zero args, emulate unittest.run behaviour
			suite = testresources.TestLoader().loadTestsFromModule(
				sys.modules["__main__"])

		runner = unittest.TextTestRunner(
			verbosity=int(os.environ.get("TEST_VERBOSITY", 1)))
		result = runner.run(suite)
		if result.errors or result.failures:
			sys.exit(1)
		else:
			sys.exit(0)
	except (SystemExit, KeyboardInterrupt):
		raise
	except:
		base.showHints = True
		from gavo.user import errhandle
		traceback.print_exc()
		errhandle.raiseAndCatch(base)
		sys.exit(1)