File: test_request_body.py

package info (click to toggle)
python-falcon 0.1.8-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 832 kB
  • ctags: 997
  • sloc: python: 4,006; makefile: 39; sh: 16
file content (147 lines) | stat: -rw-r--r-- 4,932 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import io
import multiprocessing
from wsgiref import simple_server

import requests

import falcon
from falcon import request_helpers
import falcon.testing as testing

SIZE_1_KB = 1024


class TestRequestBody(testing.TestBase):

    def before(self):
        self.resource = testing.TestResource()
        self.api.add_route('/', self.resource)

    def test_empty_body(self):
        self.simulate_request('/', body='')
        stream = self.resource.req.stream

        stream.seek(0, 2)
        self.assertEqual(stream.tell(), 0)

    def test_tiny_body(self):
        expected_body = '.'
        self.simulate_request('', body=expected_body)
        stream = self.resource.req.stream

        actual_body = stream.read(1)
        self.assertEqual(actual_body, expected_body.encode('utf-8'))

        stream.seek(0, 2)
        self.assertEqual(stream.tell(), 1)

    def test_tiny_body_overflow(self):
        expected_body = '.'
        self.simulate_request('', body=expected_body)
        stream = self.resource.req.stream

        # Read too many bytes; shouldn't block
        actual_body = stream.read(len(expected_body) + 1)
        self.assertEqual(actual_body, expected_body.encode('utf-8'))

    def test_read_body(self):
        expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
        expected_len = len(expected_body)
        headers = {'Content-Length': str(expected_len)}

        self.simulate_request('', body=expected_body, headers=headers)

        content_len = self.resource.req.get_header('content-length')
        self.assertEqual(content_len, str(expected_len))

        stream = self.resource.req.stream

        actual_body = stream.read()
        self.assertEqual(actual_body, expected_body.encode('utf-8'))

        stream.seek(0, 2)
        self.assertEqual(stream.tell(), expected_len)

        self.assertEqual(stream.tell(), expected_len)

    def test_read_socket_body(self):
        expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)

        def server():
            class Echo(object):
                def on_post(self, req, resp):
                    # wsgiref socket._fileobject blocks when len not given,
                    # but Falcon is smarter than that. :D
                    body = req.stream.read()
                    resp.body = body

                def on_put(self, req, resp):
                    # wsgiref socket._fileobject blocks when len too long,
                    # but Falcon should work around that for me.
                    body = req.stream.read(req.content_length + 1)
                    resp.body = body

            api = falcon.API()
            api.add_route('/echo', Echo())

            httpd = simple_server.make_server('127.0.0.1', 8989, api)
            httpd.serve_forever()

        process = multiprocessing.Process(target=server)
        process.daemon = True
        process.start()

        # Let it boot
        process.join(1)

        url = 'http://127.0.0.1:8989/echo'
        resp = requests.post(url, data=expected_body)
        self.assertEqual(resp.text, expected_body)

        resp = requests.put(url, data=expected_body)
        self.assertEqual(resp.text, expected_body)

        process.terminate()

    def test_body_stream_wrapper(self):
        data = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
        expected_body = data.encode('utf-8')
        expected_len = len(expected_body)

        # NOTE(kgriffs): Append newline char to each line
        # to match readlines behavior
        expected_lines = [(line + '\n').encode('utf-8')
                          for line in data.split('\n')]

        # NOTE(kgriffs): Remove trailing newline to simulate
        # what readlines does
        expected_lines[-1] = expected_lines[-1][:-1]

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(body.read(), expected_body)

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(body.read(2), expected_body[0:2])

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(body.read(expected_len + 1), expected_body)

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(body.readline(), expected_lines[0])

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(body.readlines(), expected_lines)

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        self.assertEqual(next(body), expected_lines[0])

        stream = io.BytesIO(expected_body)
        body = request_helpers.Body(stream, expected_len)
        for i, line in enumerate(body):
            self.assertEqual(line, expected_lines[i])