1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
|
import io
import pytest
import falcon
from falcon import testing
@pytest.fixture
def client():
return testing.TestClient(falcon.App())
# NOTE(kgriffs): Concept from Gunicorn's source (wsgi.py)
class FileWrapper:
def __init__(self, file_like, block_size=8192):
self.file_like = file_like
self.block_size = block_size
def __getitem__(self, key):
data = self.file_like.read(self.block_size)
if data:
return data
raise IndexError
class HelloResource:
sample_status = '200 OK'
sample_unicode = 'Hello World! \x80' + testing.rand_string(0, 0)
sample_utf8 = sample_unicode.encode('utf-8')
def __init__(self, mode):
self.called = False
self.mode = mode
def on_get(self, req, resp):
self.called = True
self.req, self.resp = req, resp
resp.status = falcon.HTTP_200
if 'stream' in self.mode:
if 'filelike' in self.mode:
stream = io.BytesIO(self.sample_utf8)
else:
stream = [self.sample_utf8]
if 'stream_len' in self.mode:
stream_len = len(self.sample_utf8)
else:
stream_len = None
if 'use_helper' in self.mode:
resp.set_stream(stream, stream_len)
else:
resp.stream = stream
resp.content_length = stream_len
if 'body' in self.mode:
if 'bytes' in self.mode:
resp.text = self.sample_utf8
else:
resp.text = self.sample_unicode
if 'data' in self.mode:
resp.data = self.sample_utf8
def on_head(self, req, resp):
self.on_get(req, resp)
class ClosingBytesIO(io.BytesIO):
close_called = False
def close(self):
super(ClosingBytesIO, self).close()
self.close_called = True
# NOTE(vytas): Do not inherit from BytesIO but encapsulate instead, as on
# CPython 3.13 garbage-collecting a BytesIO instance with an invalid .close()
# sometimes bubbles up a warning about exception when trying to call it.
class NonClosingBytesIO:
# Not callable; test that CloseableStreamIterator ignores it
close = False
def __init__(self, data=b''):
self._stream = io.BytesIO(data)
self.read = self._stream.read
class ClosingFilelikeHelloResource:
sample_status = '200 OK'
sample_unicode = 'Hello World! \x80' + testing.rand_string(0, 0)
sample_utf8 = sample_unicode.encode('utf-8')
def __init__(self, stream_factory):
self.called = False
self.stream = stream_factory(self.sample_utf8)
self.stream_len = len(self.sample_utf8)
def on_get(self, req, resp):
self.called = True
self.req, self.resp = req, resp
resp.status = falcon.HTTP_200
resp.set_stream(self.stream, self.stream_len)
class NoStatusResource:
def on_get(self, req, resp):
pass
class TestHelloWorld:
def test_env_headers_list_of_tuples(self):
env = testing.create_environ(headers=[('User-Agent', 'Falcon-Test')])
assert env['HTTP_USER_AGENT'] == 'Falcon-Test'
def test_root_route(self, client):
doc = {'message': 'Hello world!'}
resource = testing.SimpleTestResource(json=doc)
client.app.add_route('/', resource)
result = client.simulate_get()
assert result.json == doc
def test_no_route(self, client):
result = client.simulate_get('/seenoevil')
assert result.status_code == 404
@pytest.mark.parametrize(
'path,resource,get_body',
[
('/body', HelloResource('body'), lambda r: r.text.encode('utf-8')),
('/bytes', HelloResource('body, bytes'), lambda r: r.text),
('/data', HelloResource('data'), lambda r: r.data),
],
)
def test_body(self, client, path, resource, get_body):
client.app.add_route(path, resource)
result = client.simulate_get(path)
resp = resource.resp
content_length = int(result.headers['content-length'])
assert content_length == len(resource.sample_utf8)
assert result.status == resource.sample_status
assert resp.status == resource.sample_status
assert get_body(resp) == resource.sample_utf8
assert result.content == resource.sample_utf8
def test_no_body_on_head(self, client):
resource = HelloResource('body')
client.app.add_route('/body', resource)
result = client.simulate_head('/body')
assert not result.content
assert result.status_code == 200
assert resource.called
assert result.headers['content-length'] == str(len(HelloResource.sample_utf8))
def test_stream_chunked(self, client):
resource = HelloResource('stream')
client.app.add_route('/chunked-stream', resource)
result = client.simulate_get('/chunked-stream')
assert result.content == resource.sample_utf8
assert 'content-length' not in result.headers
def test_stream_known_len(self, client):
resource = HelloResource('stream, stream_len')
client.app.add_route('/stream', resource)
result = client.simulate_get('/stream')
assert resource.called
expected_len = int(resource.resp.content_length)
actual_len = int(result.headers['content-length'])
assert actual_len == expected_len
assert len(result.content) == expected_len
assert result.content == resource.sample_utf8
def test_filelike(self, client):
resource = HelloResource('stream, stream_len, filelike')
client.app.add_route('/filelike', resource)
for file_wrapper in (None, FileWrapper):
result = client.simulate_get('/filelike', file_wrapper=file_wrapper)
assert resource.called
expected_len = int(resource.resp.content_length)
actual_len = int(result.headers['content-length'])
assert actual_len == expected_len
assert len(result.content) == expected_len
for file_wrapper in (None, FileWrapper):
result = client.simulate_get('/filelike', file_wrapper=file_wrapper)
assert resource.called
expected_len = int(resource.resp.content_length)
actual_len = int(result.headers['content-length'])
assert actual_len == expected_len
assert len(result.content) == expected_len
@pytest.mark.parametrize(
'stream_factory,assert_closed',
[
(ClosingBytesIO, True), # Implements close()
(NonClosingBytesIO, False), # Has a non-callable "close" attr
],
)
def test_filelike_closing(self, client, stream_factory, assert_closed):
resource = ClosingFilelikeHelloResource(stream_factory)
client.app.add_route('/filelike-closing', resource)
result = client.simulate_get('/filelike-closing', file_wrapper=None)
assert resource.called
expected_len = int(resource.resp.content_length)
actual_len = int(result.headers['content-length'])
assert actual_len == expected_len
assert len(result.content) == expected_len
if assert_closed:
assert resource.stream.close_called
def test_filelike_using_helper(self, client):
resource = HelloResource('stream, stream_len, filelike, use_helper')
client.app.add_route('/filelike-helper', resource)
result = client.simulate_get('/filelike-helper')
assert resource.called
expected_len = int(resource.resp.content_length)
actual_len = int(result.headers['content-length'])
assert actual_len == expected_len
assert len(result.content) == expected_len
def test_status_not_set(self, client):
client.app.add_route('/nostatus', NoStatusResource())
result = client.simulate_get('/nostatus')
assert not result.content
assert result.status_code == 200
|