1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import io
import multiprocessing
from wsgiref import simple_server
import requests
import falcon
from falcon import request_helpers
import falcon.testing as testing
SIZE_1_KB = 1024
class TestRequestBody(testing.TestBase):
def before(self):
self.resource = testing.TestResource()
self.api.add_route('/', self.resource)
def test_empty_body(self):
self.simulate_request('/', body='')
stream = self.resource.req.stream
stream.seek(0, 2)
self.assertEqual(stream.tell(), 0)
def test_tiny_body(self):
expected_body = '.'
self.simulate_request('', body=expected_body)
stream = self.resource.req.stream
actual_body = stream.read(1)
self.assertEqual(actual_body, expected_body.encode('utf-8'))
stream.seek(0, 2)
self.assertEqual(stream.tell(), 1)
def test_tiny_body_overflow(self):
expected_body = '.'
self.simulate_request('', body=expected_body)
stream = self.resource.req.stream
# Read too many bytes; shouldn't block
actual_body = stream.read(len(expected_body) + 1)
self.assertEqual(actual_body, expected_body.encode('utf-8'))
def test_read_body(self):
expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
expected_len = len(expected_body)
headers = {'Content-Length': str(expected_len)}
self.simulate_request('', body=expected_body, headers=headers)
content_len = self.resource.req.get_header('content-length')
self.assertEqual(content_len, str(expected_len))
stream = self.resource.req.stream
actual_body = stream.read()
self.assertEqual(actual_body, expected_body.encode('utf-8'))
stream.seek(0, 2)
self.assertEqual(stream.tell(), expected_len)
self.assertEqual(stream.tell(), expected_len)
def test_read_socket_body(self):
expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
def server():
class Echo(object):
def on_post(self, req, resp):
# wsgiref socket._fileobject blocks when len not given,
# but Falcon is smarter than that. :D
body = req.stream.read()
resp.body = body
def on_put(self, req, resp):
# wsgiref socket._fileobject blocks when len too long,
# but Falcon should work around that for me.
body = req.stream.read(req.content_length + 1)
resp.body = body
api = falcon.API()
api.add_route('/echo', Echo())
httpd = simple_server.make_server('127.0.0.1', 8989, api)
httpd.serve_forever()
process = multiprocessing.Process(target=server)
process.daemon = True
process.start()
# Let it boot
process.join(1)
url = 'http://127.0.0.1:8989/echo'
resp = requests.post(url, data=expected_body)
self.assertEqual(resp.text, expected_body)
resp = requests.put(url, data=expected_body)
self.assertEqual(resp.text, expected_body)
process.terminate()
def test_body_stream_wrapper(self):
data = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
expected_body = data.encode('utf-8')
expected_len = len(expected_body)
# NOTE(kgriffs): Append newline char to each line
# to match readlines behavior
expected_lines = [(line + '\n').encode('utf-8')
for line in data.split('\n')]
# NOTE(kgriffs): Remove trailing newline to simulate
# what readlines does
expected_lines[-1] = expected_lines[-1][:-1]
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(body.read(), expected_body)
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(body.read(2), expected_body[0:2])
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(body.read(expected_len + 1), expected_body)
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(body.readline(), expected_lines[0])
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(body.readlines(), expected_lines)
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
self.assertEqual(next(body), expected_lines[0])
stream = io.BytesIO(expected_body)
body = request_helpers.Body(stream, expected_len)
for i, line in enumerate(body):
self.assertEqual(line, expected_lines[i])
|