1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
from hashlib import md5
import unittest
from tornado.escape import utf8
from tornado.testing import AsyncHTTPTestCase
from tornado.test import httpclient_test
from tornado.web import Application, RequestHandler
try:
import pycurl
except ImportError:
pycurl = None # type: ignore
if pycurl is not None:
from tornado.curl_httpclient import CurlAsyncHTTPClient
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
def get_http_client(self):
client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
# make sure AsyncHTTPClient magic doesn't give us the wrong class
self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
return client
class DigestAuthHandler(RequestHandler):
def initialize(self, username, password):
self.username = username
self.password = password
def get(self):
realm = "test"
opaque = "asdf"
# Real implementations would use a random nonce.
nonce = "1234"
auth_header = self.request.headers.get("Authorization", None)
if auth_header is not None:
auth_mode, params = auth_header.split(" ", 1)
assert auth_mode == "Digest"
param_dict = {}
for pair in params.split(","):
k, v = pair.strip().split("=", 1)
if v[0] == '"' and v[-1] == '"':
v = v[1:-1]
param_dict[k] = v
assert param_dict["realm"] == realm
assert param_dict["opaque"] == opaque
assert param_dict["nonce"] == nonce
assert param_dict["username"] == self.username
assert param_dict["uri"] == self.request.path
h1 = md5(
utf8("%s:%s:%s" % (self.username, realm, self.password))
).hexdigest()
h2 = md5(
utf8("%s:%s" % (self.request.method, self.request.path))
).hexdigest()
digest = md5(utf8("%s:%s:%s" % (h1, nonce, h2))).hexdigest()
if digest == param_dict["response"]:
self.write("ok")
else:
self.write("fail")
else:
self.set_status(401)
self.set_header(
"WWW-Authenticate",
'Digest realm="%s", nonce="%s", opaque="%s"' % (realm, nonce, opaque),
)
class CustomReasonHandler(RequestHandler):
def get(self):
self.set_status(200, "Custom reason")
class CustomFailReasonHandler(RequestHandler):
def get(self):
self.set_status(400, "Custom reason")
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.http_client = self.create_client()
def get_app(self):
return Application(
[
("/digest", DigestAuthHandler, {"username": "foo", "password": "bar"}),
(
"/digest_non_ascii",
DigestAuthHandler,
{"username": "foo", "password": "barユ£"},
),
("/custom_reason", CustomReasonHandler),
("/custom_fail_reason", CustomFailReasonHandler),
]
)
def create_client(self, **kwargs):
return CurlAsyncHTTPClient(
force_instance=True, defaults=dict(allow_ipv6=False), **kwargs
)
def test_digest_auth(self):
response = self.fetch(
"/digest", auth_mode="digest", auth_username="foo", auth_password="bar"
)
self.assertEqual(response.body, b"ok")
def test_custom_reason(self):
response = self.fetch("/custom_reason")
self.assertEqual(response.reason, "Custom reason")
def test_fail_custom_reason(self):
response = self.fetch("/custom_fail_reason")
self.assertEqual(str(response.error), "HTTP 400: Custom reason")
def test_digest_auth_non_ascii(self):
response = self.fetch(
"/digest_non_ascii",
auth_mode="digest",
auth_username="foo",
auth_password="barユ£",
)
self.assertEqual(response.body, b"ok")
|