1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# coding: utf-8
from __future__ import absolute_import, division, print_function
from hashlib import md5
from tornado.escape import utf8
from tornado.httpclient import HTTPRequest, HTTPClientError
from tornado.locks import Event
from tornado.stack_context import ExceptionStackContext
from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.test import httpclient_test
from tornado.test.util import unittest, ignore_deprecation
from tornado.web import Application, RequestHandler
try:
import pycurl # type: ignore
except ImportError:
pycurl = None
if pycurl is not None:
from tornado.curl_httpclient import CurlAsyncHTTPClient
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
def get_http_client(self):
client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
# make sure AsyncHTTPClient magic doesn't give us the wrong class
self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
return client
class DigestAuthHandler(RequestHandler):
def initialize(self, username, password):
self.username = username
self.password = password
def get(self):
realm = 'test'
opaque = 'asdf'
# Real implementations would use a random nonce.
nonce = "1234"
auth_header = self.request.headers.get('Authorization', None)
if auth_header is not None:
auth_mode, params = auth_header.split(' ', 1)
assert auth_mode == 'Digest'
param_dict = {}
for pair in params.split(','):
k, v = pair.strip().split('=', 1)
if v[0] == '"' and v[-1] == '"':
v = v[1:-1]
param_dict[k] = v
assert param_dict['realm'] == realm
assert param_dict['opaque'] == opaque
assert param_dict['nonce'] == nonce
assert param_dict['username'] == self.username
assert param_dict['uri'] == self.request.path
h1 = md5(utf8('%s:%s:%s' % (self.username, realm, self.password))).hexdigest()
h2 = md5(utf8('%s:%s' % (self.request.method,
self.request.path))).hexdigest()
digest = md5(utf8('%s:%s:%s' % (h1, nonce, h2))).hexdigest()
if digest == param_dict['response']:
self.write('ok')
else:
self.write('fail')
else:
self.set_status(401)
self.set_header('WWW-Authenticate',
'Digest realm="%s", nonce="%s", opaque="%s"' %
(realm, nonce, opaque))
class CustomReasonHandler(RequestHandler):
def get(self):
self.set_status(200, "Custom reason")
class CustomFailReasonHandler(RequestHandler):
def get(self):
self.set_status(400, "Custom reason")
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
def setUp(self):
super(CurlHTTPClientTestCase, self).setUp()
self.http_client = self.create_client()
def get_app(self):
return Application([
('/digest', DigestAuthHandler, {'username': 'foo', 'password': 'bar'}),
('/digest_non_ascii', DigestAuthHandler, {'username': 'foo', 'password': 'barユ£'}),
('/custom_reason', CustomReasonHandler),
('/custom_fail_reason', CustomFailReasonHandler),
])
def create_client(self, **kwargs):
return CurlAsyncHTTPClient(force_instance=True,
defaults=dict(allow_ipv6=False),
**kwargs)
@gen_test
def test_prepare_curl_callback_stack_context(self):
exc_info = []
error_event = Event()
def error_handler(typ, value, tb):
exc_info.append((typ, value, tb))
error_event.set()
return True
with ignore_deprecation():
with ExceptionStackContext(error_handler):
request = HTTPRequest(self.get_url('/custom_reason'),
prepare_curl_callback=lambda curl: 1 / 0)
yield [error_event.wait(), self.http_client.fetch(request)]
self.assertEqual(1, len(exc_info))
self.assertIs(exc_info[0][0], ZeroDivisionError)
def test_digest_auth(self):
response = self.fetch('/digest', auth_mode='digest',
auth_username='foo', auth_password='bar')
self.assertEqual(response.body, b'ok')
def test_custom_reason(self):
response = self.fetch('/custom_reason')
self.assertEqual(response.reason, "Custom reason")
def test_fail_custom_reason(self):
response = self.fetch('/custom_fail_reason')
self.assertEqual(str(response.error), "HTTP 400: Custom reason")
def test_failed_setup(self):
self.http_client = self.create_client(max_clients=1)
for i in range(5):
with ignore_deprecation():
response = self.fetch(u'/ユニコード')
self.assertIsNot(response.error, None)
with self.assertRaises((UnicodeEncodeError, HTTPClientError)):
# This raises UnicodeDecodeError on py3 and
# HTTPClientError(404) on py2. The main motivation of
# this test is to ensure that the UnicodeEncodeError
# during the setup phase doesn't lead the request to
# be dropped on the floor.
response = self.fetch(u'/ユニコード', raise_error=True)
def test_digest_auth_non_ascii(self):
response = self.fetch('/digest_non_ascii', auth_mode='digest',
auth_username='foo', auth_password='barユ£')
self.assertEqual(response.body, b'ok')
|