1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
|
"""Tests for managing HTTP issues (malformed requests, etc)."""
import errno
import socket
import urllib.parse # noqa: WPS301
import pytest
from cheroot.test import helper
HTTP_BAD_REQUEST = 400
HTTP_LENGTH_REQUIRED = 411
HTTP_NOT_FOUND = 404
HTTP_REQUEST_ENTITY_TOO_LARGE = 413
HTTP_OK = 200
HTTP_VERSION_NOT_SUPPORTED = 505
class HelloController(helper.Controller):
"""Controller for serving WSGI apps."""
def hello(req, resp):
"""Render Hello world."""
return 'Hello world!'
def body_required(req, resp):
"""Render Hello world or set 411."""
if req.environ.get('Content-Length', None) is None:
resp.status = '411 Length Required'
return
return 'Hello world!'
def query_string(req, resp):
"""Render QUERY_STRING value."""
return req.environ.get('QUERY_STRING', '')
def asterisk(req, resp):
"""Render request method value."""
# pylint: disable=possibly-unused-variable
method = req.environ.get('REQUEST_METHOD', 'NO METHOD FOUND')
tmpl = 'Got asterisk URI path with {method} method'
return tmpl.format(**locals())
def _munge(string):
"""Encode PATH_INFO correctly depending on Python version.
WSGI 1.0 is a mess around unicode. Create endpoints
that match the PATH_INFO that it produces.
"""
return string.encode('utf-8').decode('latin-1')
handlers = {
'/hello': hello,
'/no_body': hello,
'/body_required': body_required,
'/query_string': query_string,
# FIXME: Unignore the pylint rules in pylint >= 2.15.4.
# Refs:
# * https://github.com/PyCQA/pylint/issues/6592
# * https://github.com/PyCQA/pylint/pull/7395
# pylint: disable-next=too-many-function-args
_munge('/привіт'): hello,
# pylint: disable-next=too-many-function-args
_munge('/Юххууу'): hello,
'/\xa0Ðblah key 0 900 4 data': hello,
'/*': asterisk,
}
def _get_http_response(connection, method='GET'):
return connection.response_class(connection.sock, method=method)
@pytest.fixture
def testing_server(wsgi_server_client):
"""Attach a WSGI app to the given server and preconfigure it."""
wsgi_server = wsgi_server_client.server_instance
wsgi_server.wsgi_app = HelloController()
wsgi_server.max_request_body_size = 30000000
wsgi_server.server_client = wsgi_server_client
return wsgi_server
@pytest.fixture
def test_client(testing_server):
"""Get and return a test client out of the given server."""
return testing_server.server_client
@pytest.fixture
def testing_server_with_defaults(wsgi_server_client):
"""Attach a WSGI app to the given server and preconfigure it."""
wsgi_server = wsgi_server_client.server_instance
wsgi_server.wsgi_app = HelloController()
wsgi_server.server_client = wsgi_server_client
return wsgi_server
@pytest.fixture
def test_client_with_defaults(testing_server_with_defaults):
"""Get and return a test client out of the given server."""
return testing_server_with_defaults.server_client
def test_http_connect_request(test_client):
"""Check that CONNECT query results in Method Not Allowed status."""
status_line = test_client.connect('/anything')[0]
actual_status = int(status_line[:3])
assert actual_status == 405
def test_normal_request(test_client):
"""Check that normal GET query succeeds."""
status_line, _, actual_resp_body = test_client.get('/hello')
actual_status = int(status_line[:3])
assert actual_status == HTTP_OK
assert actual_resp_body == b'Hello world!'
def test_query_string_request(test_client):
"""Check that GET param is parsed well."""
status_line, _, actual_resp_body = test_client.get(
'/query_string?test=True',
)
actual_status = int(status_line[:3])
assert actual_status == HTTP_OK
assert actual_resp_body == b'test=True'
@pytest.mark.parametrize(
'uri',
(
'/hello', # plain
'/query_string?test=True', # query
'/{0}?{1}={2}'.format( # quoted unicode
*map(urllib.parse.quote, ('Юххууу', 'ї', 'йо')),
),
),
)
def test_parse_acceptable_uri(test_client, uri):
"""Check that server responds with OK to valid GET queries."""
status_line = test_client.get(uri)[0]
actual_status = int(status_line[:3])
assert actual_status == HTTP_OK
def test_parse_uri_unsafe_uri(test_client):
"""Test that malicious URI does not allow HTTP injection.
This effectively checks that sending GET request with URL
/%A0%D0blah%20key%200%20900%204%20data
is not converted into
GET /
blah key 0 900 4 data
HTTP/1.1
which would be a security issue otherwise.
"""
c = test_client.get_connection()
resource = '/\xa0Ðblah key 0 900 4 data'.encode('latin-1')
quoted = urllib.parse.quote(resource)
assert quoted == '/%A0%D0blah%20key%200%20900%204%20data'
request = 'GET {quoted} HTTP/1.1'.format(**locals())
c._output(request.encode('utf-8'))
c._send_output()
response = _get_http_response(c, method='GET')
response.begin()
assert response.status == HTTP_OK
assert response.read(12) == b'Hello world!'
c.close()
def test_parse_uri_invalid_uri(test_client):
"""Check that server responds with Bad Request to invalid GET queries.
Invalid request line test case: it should only contain US-ASCII.
"""
c = test_client.get_connection()
c._output(u'GET /йопта! HTTP/1.1'.encode('utf-8'))
c._send_output()
response = _get_http_response(c, method='GET')
response.begin()
assert response.status == HTTP_BAD_REQUEST
assert response.read(21) == b'Malformed Request-URI'
c.close()
@pytest.mark.parametrize(
'uri',
(
'hello', # ascii
'привіт', # non-ascii
),
)
def test_parse_no_leading_slash_invalid(test_client, uri):
"""Check that server responds with Bad Request to invalid GET queries.
Invalid request line test case: it should have leading slash (be absolute).
"""
status_line, _, actual_resp_body = test_client.get(
urllib.parse.quote(uri),
)
actual_status = int(status_line[:3])
assert actual_status == HTTP_BAD_REQUEST
assert b'starting with a slash' in actual_resp_body
def test_parse_uri_absolute_uri(test_client):
"""Check that server responds with Bad Request to Absolute URI.
Only proxy servers should allow this.
"""
status_line, _, actual_resp_body = test_client.get('http://google.com/')
actual_status = int(status_line[:3])
assert actual_status == HTTP_BAD_REQUEST
expected_body = b'Absolute URI not allowed if server is not a proxy.'
assert actual_resp_body == expected_body
def test_parse_uri_asterisk_uri(test_client):
"""Check that server responds with OK to OPTIONS with "*" Absolute URI."""
status_line, _, actual_resp_body = test_client.options('*')
actual_status = int(status_line[:3])
assert actual_status == HTTP_OK
expected_body = b'Got asterisk URI path with OPTIONS method'
assert actual_resp_body == expected_body
def test_parse_uri_fragment_uri(test_client):
"""Check that server responds with Bad Request to URI with fragment."""
status_line, _, actual_resp_body = test_client.get(
'/hello?test=something#fake',
)
actual_status = int(status_line[:3])
assert actual_status == HTTP_BAD_REQUEST
expected_body = b'Illegal #fragment in Request-URI.'
assert actual_resp_body == expected_body
def test_no_content_length(test_client):
"""Test POST query with an empty body being successful."""
# "The presence of a message-body in a request is signaled by the
# inclusion of a Content-Length or Transfer-Encoding header field in
# the request's message-headers."
#
# Send a message with neither header and no body.
c = test_client.get_connection()
c.request('POST', '/no_body')
response = c.getresponse()
actual_resp_body = response.read()
actual_status = response.status
assert actual_status == HTTP_OK
assert actual_resp_body == b'Hello world!'
c.close() # deal with the resource warning
def test_content_length_required(test_client):
"""Test POST query with body failing because of missing Content-Length."""
# Now send a message that has no Content-Length, but does send a body.
# Verify that CP times out the socket and responds
# with 411 Length Required.
c = test_client.get_connection()
c.request('POST', '/body_required')
response = c.getresponse()
response.read()
actual_status = response.status
assert actual_status == HTTP_LENGTH_REQUIRED
c.close() # deal with the resource warning
@pytest.mark.xfail(
reason='https://github.com/cherrypy/cheroot/issues/106',
strict=False, # sometimes it passes
)
def test_large_request(test_client_with_defaults):
"""Test GET query with maliciously large Content-Length."""
# If the server's max_request_body_size is not set (i.e. is set to 0)
# then this will result in an `OverflowError: Python int too large to
# convert to C ssize_t` in the server.
# We expect that this should instead return that the request is too
# large.
c = test_client_with_defaults.get_connection()
c.putrequest('GET', '/hello')
c.putheader('Content-Length', str(2**64))
c.endheaders()
response = c.getresponse()
actual_status = response.status
assert actual_status == HTTP_REQUEST_ENTITY_TOO_LARGE
@pytest.mark.parametrize(
('request_line', 'status_code', 'expected_body'),
(
(
b'GET /', # missing proto
HTTP_BAD_REQUEST, b'Malformed Request-Line',
),
(
b'GET / HTTPS/1.1', # invalid proto
HTTP_BAD_REQUEST, b'Malformed Request-Line: bad protocol',
),
(
b'GET / HTTP/1', # invalid version
HTTP_BAD_REQUEST, b'Malformed Request-Line: bad version',
),
(
b'GET / HTTP/2.15', # invalid ver
HTTP_VERSION_NOT_SUPPORTED, b'Cannot fulfill request',
),
),
)
def test_malformed_request_line(
test_client, request_line,
status_code, expected_body,
):
"""Test missing or invalid HTTP version in Request-Line."""
c = test_client.get_connection()
c._output(request_line)
c._send_output()
response = _get_http_response(c, method='GET')
response.begin()
assert response.status == status_code
assert response.read(len(expected_body)) == expected_body
c.close()
def test_malformed_http_method(test_client):
"""Test non-uppercase HTTP method."""
c = test_client.get_connection()
c.putrequest('GeT', '/malformed_method_case')
c.putheader('Content-Type', 'text/plain')
c.endheaders()
response = c.getresponse()
actual_status = response.status
assert actual_status == HTTP_BAD_REQUEST
actual_resp_body = response.read(21)
assert actual_resp_body == b'Malformed method name'
c.close() # deal with the resource warning
def test_malformed_header(test_client):
"""Check that broken HTTP header results in Bad Request."""
c = test_client.get_connection()
c.putrequest('GET', '/')
c.putheader('Content-Type', 'text/plain')
# See https://www.bitbucket.org/cherrypy/cherrypy/issue/941
c._output(b'Re, 1.2.3.4#015#012')
c.endheaders()
response = c.getresponse()
actual_status = response.status
assert actual_status == HTTP_BAD_REQUEST
actual_resp_body = response.read(20)
assert actual_resp_body == b'Illegal header line.'
c.close() # deal with the resource warning
def test_request_line_split_issue_1220(test_client):
"""Check that HTTP request line of exactly 256 chars length is OK."""
Request_URI = (
'/hello?'
'intervenant-entreprise-evenement_classaction='
'evenement-mailremerciements'
'&_path=intervenant-entreprise-evenement'
'&intervenant-entreprise-evenement_action-id=19404'
'&intervenant-entreprise-evenement_id=19404'
'&intervenant-entreprise_id=28092'
)
assert len('GET %s HTTP/1.1\r\n' % Request_URI) == 256
actual_resp_body = test_client.get(Request_URI)[2]
assert actual_resp_body == b'Hello world!'
def test_garbage_in(test_client):
"""Test that server sends an error for garbage received over TCP."""
# Connect without SSL regardless of server.scheme
c = test_client.get_connection()
c._output(b'gjkgjklsgjklsgjkljklsg')
c._send_output()
response = c.response_class(c.sock, method='GET')
try:
response.begin()
actual_status = response.status
assert actual_status == HTTP_BAD_REQUEST
actual_resp_body = response.read(22)
assert actual_resp_body == b'Malformed Request-Line'
c.close()
except socket.error as ex:
# "Connection reset by peer" is also acceptable.
if ex.errno != errno.ECONNRESET:
raise
class CloseController:
"""Controller for testing the close callback."""
def __call__(self, environ, start_response):
"""Get the req to know header sent status."""
self.req = start_response.__self__.req
resp = CloseResponse(self.close)
start_response(resp.status, resp.headers.items())
return resp
def close(self):
"""Close, writing hello."""
self.req.write(b'hello')
class CloseResponse:
"""Dummy empty response to trigger the no body status."""
def __init__(self, close):
"""Use some defaults to ensure we have a header."""
self.status = '200 OK'
self.headers = {'Content-Type': 'text/html'}
self.close = close
def __getitem__(self, index):
"""Ensure we don't have a body."""
raise IndexError()
def output(self):
"""Return self to hook the close method."""
return self
@pytest.fixture
def testing_server_close(wsgi_server_client):
"""Attach a WSGI app to the given server and preconfigure it."""
wsgi_server = wsgi_server_client.server_instance
wsgi_server.wsgi_app = CloseController()
wsgi_server.max_request_body_size = 30000000
wsgi_server.server_client = wsgi_server_client
return wsgi_server
def test_send_header_before_closing(testing_server_close):
"""Test we are actually sending the headers before calling 'close'."""
_, _, resp_body = testing_server_close.server_client.get('/')
assert resp_body == b'hello'
|