1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
import sys
from functools import wraps
from http.client import HTTPException
from io import StringIO
import pytest
from geventhttpclient.response import HTTPResponse
RESPONSE = (
"HTTP/1.1 301 Moved Permanently\r\nLocation: http://www.google.fr/\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Date: Thu, 13 Oct 2011 15:03:12 GMT\r\n"
"Expires: Sat, 12 Nov 2011 15:03:12 GMT\r\n"
"Cache-Control: public, max-age=2592000\r\n"
"Server: gws\r\nContent-Length: 218\r\n"
"X-XSS-Protection: 1; mode=block\r\n\r\n"
'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n'
"<TITLE>301 Moved</TITLE></HEAD><BODY>\n"
'<H1>301 Moved</H1>\nThe document has moved\n<A HREF="http://www.google.fr/">here</A>.\r\n'
"</BODY></HTML>\r\n"
)
# borrowed from gevent
# sys.gettotalrefcount is available only with python built with debug flag on
gettotalrefcount = getattr(sys, "gettotalrefcount", None)
def wrap_refcount(method):
if gettotalrefcount is None:
return method
@wraps(method)
def wrapped(*args, **kwargs):
import gc
gc.disable()
gc.collect()
deltas = []
d = None
try:
for _ in range(4):
d = gettotalrefcount()
method(*args, **kwargs)
if "urlparse" in sys.modules:
sys.modules["urlparse"].clear_cache()
d = gettotalrefcount() - d
deltas.append(d)
if deltas[-1] == 0:
break
else:
raise AssertionError(f"refcount increased by {deltas!r}")
finally:
gc.collect()
gc.enable()
return wrapped
@wrap_refcount
def test_parse():
parser = HTTPResponse()
parser.feed(RESPONSE)
assert parser.message_begun
assert parser.headers_complete
assert parser.message_complete
@wrap_refcount
def test_parse_small_blocks():
parser = HTTPResponse()
parser.feed(RESPONSE)
response = StringIO(RESPONSE)
while not parser.message_complete:
data = response.read(10)
parser.feed(data)
assert parser.message_begun
assert parser.headers_complete
assert parser.message_complete
assert parser.should_keep_alive()
assert parser.status_code == 301
assert sorted(parser.items()) == [
("Cache-Control", "public, max-age=2592000"),
("Content-Length", "218"),
("Content-Type", "text/html; charset=UTF-8"),
("Date", "Thu, 13 Oct 2011 15:03:12 GMT"),
("Expires", "Sat, 12 Nov 2011 15:03:12 GMT"),
("Location", "http://www.google.fr/"),
("Server", "gws"),
("X-XSS-Protection", "1; mode=block"),
]
@wrap_refcount
def test_parse_error():
response = HTTPResponse()
try:
response.feed("HTTP/1.1 asdf\r\n\r\n")
response.feed("")
assert response.status_code, 0
assert response.message_begun
except HTTPException as e:
assert "Invalid response status" in str(e)
else:
assert False, "should have raised"
@wrap_refcount
def test_incomplete_response():
response = HTTPResponse()
response.feed("""HTTP/1.1 200 Ok\r\nContent-Length:10\r\n\r\n1""")
with pytest.raises(HTTPException):
response.feed("")
assert response.should_keep_alive()
assert response.should_close()
@wrap_refcount
def test_response_too_long():
response = HTTPResponse()
data = """HTTP/1.1 200 Ok\r\nContent-Length:1\r\n\r\ntoolong"""
with pytest.raises(HTTPException):
response.feed(data)
@wrap_refcount
def test_on_body_raises():
response = HTTPResponse()
def on_body(buf):
raise RuntimeError("error")
response._on_body = on_body
with pytest.raises(RuntimeError):
response.feed(RESPONSE)
@wrap_refcount
def test_on_message_begin():
response = HTTPResponse()
def on_message_begin():
raise RuntimeError("error")
response._on_message_begin = on_message_begin
with pytest.raises(RuntimeError):
response.feed(RESPONSE)
|