File: test_hello_asgi.py

package info (click to toggle)
python-falcon 4.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,172 kB
  • sloc: python: 33,608; javascript: 92; sh: 50; makefile: 50
file content (393 lines) | stat: -rw-r--r-- 12,178 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import io
import os
import tempfile

import pytest

import falcon
from falcon import testing
import falcon.asgi

try:
    import aiofiles
except ImportError:
    aiofiles = None  # type: ignore[assignment]

SIZE_1_KB = 1024


@pytest.fixture
def client():
    return testing.TestClient(falcon.asgi.App())


class DataReaderWithoutClose:
    def __init__(self, data):
        self._stream = io.BytesIO(data)
        self.close_called = False

    async def read(self, num_bytes):
        return self._stream.read(num_bytes)


class DataReader(DataReaderWithoutClose):
    async def close(self):
        self.close_called = True


class HelloResource:
    sample_status = '200 OK'
    sample_unicode = 'Hello World! \x80 - ' + testing.rand_string(0, 5)
    sample_utf8 = sample_unicode.encode('utf-8')

    def __init__(self, mode):
        self.called = False
        self.mode = mode

    async def on_get(self, req, resp):
        self.called = True
        self.req, self.resp = req, resp

        resp.status = falcon.HTTP_200

        if 'stream' in self.mode:
            if 'filelike' in self.mode:
                stream = DataReader(self.sample_utf8)
            else:

                async def data_emitter():
                    for b in self.sample_utf8:
                        yield bytes([b])

                if 'stream_genfunc' in self.mode:
                    stream = data_emitter
                elif 'stream_nongenfunc' in self.mode:
                    stream = 42
                else:
                    stream = data_emitter()

            if 'stream_len' in self.mode:
                stream_len = len(self.sample_utf8)
            else:
                stream_len = None

            if 'use_helper' in self.mode:
                resp.set_stream(stream, stream_len)
            else:
                resp.stream = stream
                resp.content_length = stream_len

        if 'body' in self.mode:
            if 'bytes' in self.mode:
                resp.text = self.sample_utf8
            else:
                resp.text = self.sample_unicode

        if 'data' in self.mode:
            resp.data = self.sample_utf8

    async def on_head(self, req, resp):
        await self.on_get(req, resp)


class ClosingFilelikeHelloResource:
    sample_status = '200 OK'
    sample_unicode = 'Hello World! \x80' + testing.rand_string(0, 0)

    sample_utf8 = sample_unicode.encode('utf-8')

    def __init__(self, stream_factory):
        self.called = False
        self.stream = stream_factory(self.sample_utf8)
        self.stream_len = len(self.sample_utf8)

    async def on_get(self, req, resp):
        self.called = True
        self.req, self.resp = req, resp
        resp.status = falcon.HTTP_200
        resp.set_stream(self.stream, self.stream_len)


class ClosingStreamResource:
    class Emitter:
        def __init__(self, value, divisor):
            self._value = value
            self._divisor = divisor
            self._remainder = None

            self.closed = False

        async def close(self):
            self.closed = True

        def __aiter__(self):
            if self._remainder is None:
                quotient, self._remainder = divmod(self._value, self._divisor)
                assert quotient >= 0

            return self

        async def __anext__(self):
            if self._value == 0:
                raise StopAsyncIteration

            if self._value >= self._divisor:
                self._value -= self._divisor
                return f'{self._divisor}\n'.encode()

            self._value = 0
            return f'{self._remainder}\n'.encode()

    def __init__(self):
        self.stream = None

    async def on_get(self, req, resp):
        self.stream = None

        value = req.get_param_as_int('value', default=10)
        divisor = req.get_param_as_int('divisor', default=3)
        self.stream = resp.stream = self.Emitter(value, divisor)


class AIOFilesHelloResource:
    def __init__(self):
        self.sample_utf8 = testing.rand_string(8 * SIZE_1_KB, 16 * SIZE_1_KB).encode()

        fh, self.tempfile_name = tempfile.mkstemp()
        with open(fh, 'wb') as f:
            f.write(self.sample_utf8)

        self._aiofiles = None

    @property
    def aiofiles_closed(self):
        return not self._aiofiles or self._aiofiles.closed

    def cleanup(self):
        os.remove(self.tempfile_name)

    async def on_get(self, req, resp):
        self._aiofiles = await aiofiles.open(self.tempfile_name, 'rb')
        resp.stream = self._aiofiles


class NoStatusResource:
    async def on_get(self, req, resp):
        pass


class PartialCoroutineResource:
    def on_get(self, req, resp):
        pass

    async def on_post(self, req, resp):
        pass


class TestHelloWorld:
    def test_env_headers_list_of_tuples(self):
        env = testing.create_environ(headers=[('User-Agent', 'Falcon-Test')])
        assert env['HTTP_USER_AGENT'] == 'Falcon-Test'

    def test_root_route(self, client):
        doc = {'message': 'Hello world!'}
        resource = testing.SimpleTestResourceAsync(json=doc)
        client.app.add_route('/', resource)

        result = client.simulate_get()
        assert result.json == doc

    def test_no_route(self, client):
        result = client.simulate_get('/seenoevil')
        assert result.status_code == 404

    @pytest.mark.parametrize(
        'path,resource,get_body',
        [
            ('/body', HelloResource('body'), lambda r: r.text.encode('utf-8')),
            ('/bytes', HelloResource('body, bytes'), lambda r: r.text),
            ('/data', HelloResource('data'), lambda r: r.data),
        ],
    )
    def test_body(self, client, path, resource, get_body):
        client.app.add_route(path, resource)

        result = client.simulate_get(path)
        resp = resource.resp

        content_length = int(result.headers['content-length'])
        assert content_length == len(resource.sample_utf8)

        assert result.status == resource.sample_status
        assert resp.status == resource.sample_status
        assert get_body(resp) == resource.sample_utf8
        assert result.content == resource.sample_utf8

    def test_no_body_on_head(self, client):
        resource = HelloResource('body')
        client.app.add_route('/body', resource)
        result = client.simulate_head('/body')

        assert not result.content
        assert result.status_code == 200
        assert resource.called
        assert result.headers['content-length'] == str(len(HelloResource.sample_utf8))

    def test_stream_chunked(self, client):
        resource = HelloResource('stream')
        client.app.add_route('/chunked-stream', resource)

        result = client.simulate_get('/chunked-stream')

        assert result.content == resource.sample_utf8
        assert 'content-length' not in result.headers

    def test_stream_known_len(self, client):
        resource = HelloResource('stream, stream_len')
        client.app.add_route('/stream', resource)

        result = client.simulate_get('/stream')
        assert resource.called

        expected_len = int(resource.resp.content_length)
        actual_len = int(result.headers['content-length'])
        assert actual_len == expected_len
        assert len(result.content) == expected_len
        assert result.content == resource.sample_utf8

    def test_filelike(self, client):
        resource = HelloResource('stream, stream_len, filelike')
        client.app.add_route('/filelike', resource)

        result = client.simulate_get('/filelike')
        assert resource.called

        expected_len = int(resource.resp.content_length)
        actual_len = int(result.headers['content-length'])
        assert actual_len == expected_len
        assert len(result.content) == expected_len

        result = client.simulate_get('/filelike')
        assert resource.called

        expected_len = int(resource.resp.content_length)
        actual_len = int(result.headers['content-length'])
        assert actual_len == expected_len
        assert len(result.content) == expected_len

    def test_genfunc_error(self, client):
        resource = HelloResource('stream, stream_len, stream_genfunc')
        client.app.add_route('/filelike', resource)

        with pytest.raises(TypeError):
            client.simulate_get('/filelike')

    def test_nongenfunc_error(self, client):
        resource = HelloResource('stream, stream_len, stream_nongenfunc')
        client.app.add_route('/filelike', resource)

        with pytest.raises(TypeError):
            client.simulate_get('/filelike')

    @pytest.mark.parametrize(
        'stream_factory,assert_closed',
        [
            (DataReader, True),  # Implements close()
            (DataReaderWithoutClose, False),
        ],
    )
    def test_filelike_closing(self, client, stream_factory, assert_closed):
        resource = ClosingFilelikeHelloResource(stream_factory)
        client.app.add_route('/filelike-closing', resource)

        result = client.simulate_get('/filelike-closing')
        assert resource.called

        expected_len = int(resource.resp.content_length)
        actual_len = int(result.headers['content-length'])
        assert actual_len == expected_len
        assert len(result.content) == expected_len

        if assert_closed:
            assert resource.stream.close_called

    @pytest.mark.skipif(aiofiles is None, reason='aiofiles is required for this test')
    def test_filelike_closing_aiofiles(self, client):
        resource = AIOFilesHelloResource()
        try:
            client.app.add_route('/filelike-closing', resource)

            result = client.simulate_get('/filelike-closing')

            assert result.status_code == 200
            assert 'content-length' not in result.headers
            assert result.content == resource.sample_utf8

            assert resource.aiofiles_closed

        finally:
            resource.cleanup()

    def test_filelike_using_helper(self, client):
        resource = HelloResource('stream, stream_len, filelike, use_helper')
        client.app.add_route('/filelike-helper', resource)

        result = client.simulate_get('/filelike-helper')
        assert resource.called

        expected_len = int(resource.resp.content_length)
        actual_len = int(result.headers['content-length'])
        assert actual_len == expected_len
        assert len(result.content) == expected_len

    @pytest.mark.parametrize(
        'value,divisor,text,error',
        [
            (10, 3, '3\n3\n3\n1\n', None),
            (10, 7, '7\n3\n', None),
            (10, 17, '10\n', None),
            (20, 0, '', ZeroDivisionError),
        ],
    )
    def test_closing_stream(self, client, value, divisor, text, error):
        resource = ClosingStreamResource()
        client.app.add_route('/stream', resource)

        if error:
            with pytest.raises(error):
                client.simulate_get(
                    '/stream', params={'value': value, 'divisor': divisor}
                )
        else:
            result = client.simulate_get(
                '/stream', params={'value': value, 'divisor': divisor}
            )
            assert result.status_code == 200
            assert result.text == text

        assert resource.stream.closed

    def test_status_not_set(self, client):
        client.app.add_route('/nostatus', NoStatusResource())

        result = client.simulate_get('/nostatus')

        assert not result.content
        assert result.status_code == 200

    def test_coroutine_required(self, client, util):
        with util.disable_asgi_non_coroutine_wrapping():
            with pytest.raises(TypeError) as exinfo:
                client.app.add_route('/', PartialCoroutineResource())

            assert 'responder must be a non-blocking async coroutine' in str(
                exinfo.value
            )

    def test_noncoroutine_required(self):
        wsgi_app = falcon.App()

        with pytest.raises(TypeError) as exinfo:
            wsgi_app.add_route('/', PartialCoroutineResource())

        assert 'responder must be a regular synchronous method' in str(exinfo.value)