1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
#!/usr/bin/env python
"""Tests for yadis.discover.
@todo: Now that yadis.discover uses urljr.fetchers, we should be able to do
tests with a mock fetcher instead of spawning threads with BaseHTTPServer.
"""
import unittest
import urlparse
import re
import types
from openid.yadis.discover import discover, DiscoveryFailure
from openid import fetchers
import discoverdata
status_header_re = re.compile(r'Status: (\d+) .*?$', re.MULTILINE)
four04_pat = """\
Content-Type: text/plain
No such file %s
"""
class QuitServer(Exception): pass
def mkResponse(data):
status_mo = status_header_re.match(data)
headers_str, body = data.split('\n\n', 1)
headers = {}
for line in headers_str.split('\n'):
k, v = line.split(':', 1)
k = k.strip().lower()
v = v.strip()
headers[k] = v
status = int(status_mo.group(1))
return fetchers.HTTPResponse(status=status,
headers=headers,
body=body)
class TestFetcher(object):
def __init__(self, base_url):
self.base_url = base_url
def fetch(self, url, headers, body):
current_url = url
while True:
parsed = urlparse.urlparse(current_url)
path = parsed[2][1:]
try:
data = discoverdata.generateSample(path, self.base_url)
except KeyError:
return fetchers.HTTPResponse(status=404,
final_url=current_url,
headers={},
body='')
response = mkResponse(data)
if response.status in [301, 302, 303, 307]:
current_url = response.headers['location']
else:
response.final_url = current_url
return response
class TestSecondGet(unittest.TestCase):
class MockFetcher(object):
def __init__(self):
self.count = 0
def fetch(self, uri, headers=None, body=None):
self.count += 1
if self.count == 1:
headers = {
'X-XRDS-Location'.lower(): 'http://unittest/404',
}
return fetchers.HTTPResponse(uri, 200, headers, '')
else:
return fetchers.HTTPResponse(uri, 404)
def setUp(self):
self.oldfetcher = fetchers.getDefaultFetcher()
fetchers.setDefaultFetcher(self.MockFetcher())
def tearDown(self):
fetchers.setDefaultFetcher(self.oldfetcher)
def test_404(self):
uri = "http://something.unittest/"
self.failUnlessRaises(DiscoveryFailure, discover, uri)
class _TestCase(unittest.TestCase):
base_url = 'http://invalid.unittest/'
def __init__(self, input_name, id_name, result_name, success):
self.input_name = input_name
self.id_name = id_name
self.result_name = result_name
self.success = success
# Still not quite sure how to best construct these custom tests.
# Between python2.3 and python2.4, a patch attached to pyunit.sf.net
# bug #469444 got applied which breaks loadTestsFromModule on this
# class if it has test_ or runTest methods. So, kludge to change
# the method name.
unittest.TestCase.__init__(self, methodName='runCustomTest')
def setUp(self):
fetchers.setDefaultFetcher(TestFetcher(self.base_url),
wrap_exceptions=False)
self.input_url, self.expected = discoverdata.generateResult(
self.base_url,
self.input_name,
self.id_name,
self.result_name,
self.success)
def tearDown(self):
fetchers.setDefaultFetcher(None)
def runCustomTest(self):
if self.expected is DiscoveryFailure:
self.failUnlessRaises(DiscoveryFailure,
discover, self.input_url)
else:
result = discover(self.input_url)
self.failUnlessEqual(self.input_url, result.request_uri)
msg = 'Identity URL mismatch: actual = %r, expected = %r' % (
result.normalized_uri, self.expected.normalized_uri)
self.failUnlessEqual(
self.expected.normalized_uri, result.normalized_uri, msg)
msg = 'Content mismatch: actual = %r, expected = %r' % (
result.response_text, self.expected.response_text)
self.failUnlessEqual(
self.expected.response_text, result.response_text, msg)
expected_keys = dir(self.expected)
expected_keys.sort()
actual_keys = dir(result)
actual_keys.sort()
self.failUnlessEqual(actual_keys, expected_keys)
for k in dir(self.expected):
if k.startswith('__') and k.endswith('__'):
continue
exp_v = getattr(self.expected, k)
if isinstance(exp_v, types.MethodType):
continue
act_v = getattr(result, k)
assert act_v == exp_v, (k, exp_v, act_v)
def shortDescription(self):
try:
n = self.input_url
except AttributeError:
# run before setUp, or if setUp did not complete successfully.
n = self.input_name
return "%s (%s)" % (
n,
self.__class__.__module__)
def pyUnitTests():
s = unittest.TestSuite()
for success, input_name, id_name, result_name in discoverdata.testlist:
test = _TestCase(input_name, id_name, result_name, success)
s.addTest(test)
return s
def test():
runner = unittest.TextTestRunner()
return runner.run(loadTests())
if __name__ == '__main__':
test()
|