1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
"""
Tests for the various utils.*tricks modules.
"""
#c Copyright 2008-2024, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL. See the
#c COPYING file in the source distribution.
import glob
import os
import time
from gavo.helpers import testhelpers
from gavo import base
from gavo import utils
class RemoteURLTest(testhelpers.VerboseTest):
"""tests for urlopenRemote rejecting unwanted URLs.
"""
def testNoFile(self):
self.assertRaises(IOError,
utils.urlopenRemote, "file:///etc/passwd")
def testHTTPConnect(self):
# this assumes nothing actually listens on 57388
self.assertRaisesWithMsg(IOError,
"Could not open URL http://localhost:57388: Connection refused",
utils.urlopenRemote, ("http://localhost:57388",))
def testMalformedURL(self):
self.assertRaisesWithMsg(IOError,
"Could not open URL /etc/passwd: unknown url type: '/etc/passwd'",
utils.urlopenRemote, ("/etc/passwd",))
class SafeReplacedTest(testhelpers.VerboseTest):
testName = os.path.join(base.getConfig("tempDir"), "someFile")
def tearDown(self):
try:
os.unlink(self.testName)
except os.error:
pass
def testDelayedOverwrite(self):
with open(self.testName, "w") as f:
f.write("\n".join(["line%03d"%i for i in range(50)]))
with utils.safeReplaced(self.testName, binary=False) as destF:
with open(self.testName, "r") as f:
for ln in f:
destF.write("proc"+ln)
with open(self.testName) as f:
self.assertEqual(f.read().split("\n")[48], "procline048")
def testNoCrapOnError(self):
with open(self.testName, "wb") as f:
f.write(utils.bytify("\n".join(["line%03d"%i for i in range(50)])))
try:
with utils.safeReplaced(self.testName) as destF:
with open(self.testName, "rb") as f:
for ln in f:
destF.write(b"proc"+ln)
raise ValueError()
except ValueError:
# it's expected, I'm raising it myself
pass
with open(self.testName, "rb") as f:
self.assertEqual(f.read().split(b"\n")[48], b"line048")
self.assertFalse(
glob.glob(os.path.join(base.getConfig("tempDir"), "*.temp")),
"There's still a *.temp file left in tempDir; this could be"
" because of earlier failed tests. Just remove all the stuff"
" in %s"%(base.getConfig("tempDir")))
class RateLimiterTest(testhelpers.VerboseTest):
def testBasic(self):
limiter = utils.RateLimiter(0.01)
self.assertEqual(limiter.inDeadtime("test1"), False)
self.assertEqual(limiter.inDeadtime("test2"), False)
self.assertEqual(limiter.inDeadtime("test1"), True)
self.assertEqual(limiter.inDeadtime("test2"), True)
time.sleep(0.01)
self.assertEqual(limiter.inDeadtime("test1"), False)
self.assertEqual(limiter.inDeadtime("test2"), False)
self.assertEqual(limiter.inDeadtime("test1"), True)
self.assertEqual(limiter.inDeadtime("test2"), True)
if __name__=="__main__":
testhelpers.main(SafeReplacedTest)
|