File: test_request_nose.py

package info (click to toggle)
python-webob 1.4-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 3,340 kB
  • ctags: 2,468
  • sloc: python: 13,433; makefile: 12
file content (187 lines) | stat: -rw-r--r-- 5,874 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from webob.request import Request
from nose.tools import eq_ as eq, assert_raises
from webob.compat import bytes_

def test_request_no_method():
    assert Request({}).method == 'GET'

def test_request_read_no_content_length():
    req, input = _make_read_tracked_request(b'abc', 'FOO')
    assert req.content_length is None
    assert req.body == b''
    assert not input.was_read

def test_request_read_no_content_length_POST():
    req, input = _make_read_tracked_request(b'abc', 'POST')
    assert req.content_length is None
    assert req.body == b'abc'
    assert input.was_read

def test_request_read_no_flag_but_content_length_is_present():
    req, input = _make_read_tracked_request(b'abc')
    req.content_length = 3
    assert req.body == b'abc'
    assert input.was_read

def test_request_read_no_content_length_but_flagged_readable():
    req, input = _make_read_tracked_request(b'abc')
    req.is_body_readable = True
    assert req.body == b'abc'
    assert input.was_read

def test_request_read_after_setting_body_file():
    req = _make_read_tracked_request()[0]
    input = req.body_file = ReadTracker(b'abc')
    assert req.content_length is None
    assert not req.is_body_seekable
    assert req.body == b'abc'
    # reading body made the input seekable and set the clen
    assert req.content_length == 3
    assert req.is_body_seekable
    assert input.was_read

def test_request_readlines():
    req = Request.blank('/', POST='a\n'*3)
    req.is_body_seekable = False
    eq(req.body_file.readlines(), [b'a\n'] * 3)

def test_request_delete_with_body():
    req = Request.blank('/', method='DELETE')
    assert not req.is_body_readable
    req.body = b'abc'
    assert req.is_body_readable
    assert req.body_file.read() == b'abc'


def _make_read_tracked_request(data='', method='PUT'):
    input = ReadTracker(data)
    env = {
        'REQUEST_METHOD': method,
        'wsgi.input': input,
    }
    return Request(env), input

class ReadTracker(object):
    """
        Helper object to determine if the input was read or not
    """
    def __init__(self, data):
        self.data = data
        self.was_read = False
    def read(self, size=-1):
        if size < 0:
            size = len(self.data)
        assert size == len(self.data)
        self.was_read = True
        return self.data


def test_limited_length_file_repr():
    req = Request.blank('/', POST='x')
    req.body_file_raw = 'dummy'
    req.is_body_seekable = False
    eq(repr(req.body_file.raw), "<LimitedLengthFile('dummy', maxlen=1)>")

def test_request_wrong_clen(is_seekable=False):
    tlen = 1<<20
    req = Request.blank('/', POST='x'*tlen)
    eq(req.content_length, tlen)
    req.body_file = _Helper_test_request_wrong_clen(req.body_file)
    eq(req.content_length, None)
    req.content_length = tlen + 100
    req.is_body_seekable = is_seekable
    eq(req.content_length, tlen+100)
    # this raises AssertionError if the body reading
    # trusts content_length too much
    assert_raises(IOError, req.copy_body)

def test_request_wrong_clen_seekable():
    test_request_wrong_clen(is_seekable=True)

class _Helper_test_request_wrong_clen(object):
    def __init__(self, f):
        self.f = f
        self.file_ended = False

    def read(self, *args):
        r = self.f.read(*args)
        if not r:
            if self.file_ended:
                raise AssertionError("Reading should stop after first empty string")
            self.file_ended = True
        return r


def test_disconnect_detection_cgi():
    data = 'abc'*(1<<20)
    req = Request.blank('/', POST={'file':('test-file', data)})
    req.is_body_seekable = False
    req.POST # should not raise exceptions

def test_disconnect_detection_hinted_readline():
    data = 'abc'*(1<<20)
    req = Request.blank('/', POST=data)
    req.is_body_seekable = False
    line = req.body_file.readline(1<<16)
    assert line
    assert bytes_(data).startswith(line)



def test_charset_in_content_type():
    # should raise no exception
    req = Request({
        'REQUEST_METHOD': 'POST',
        'QUERY_STRING':'a=b',
        'CONTENT_TYPE':'text/html;charset=ascii'
    })
    eq(req.charset, 'ascii')
    eq(dict(req.GET), {'a': 'b'})
    eq(dict(req.POST), {})
    req.charset = 'ascii' # no exception
    assert_raises(DeprecationWarning, setattr, req, 'charset', 'utf-8')

    # again no exception
    req = Request({
        'REQUEST_METHOD': 'POST',
        'QUERY_STRING':'a=b',
        'CONTENT_TYPE':'multipart/form-data;charset=ascii'
    })
    eq(req.charset, 'ascii')
    eq(dict(req.GET), {'a': 'b'})
    assert_raises(DeprecationWarning, getattr, req, 'POST')




def test_json_body_invalid_json():
    request = Request.blank('/', POST=b'{')
    assert_raises(ValueError, getattr, request, 'json_body')

def test_json_body_valid_json():
    request = Request.blank('/', POST=b'{"a":1}')
    eq(request.json_body, {'a':1})

def test_json_body_alternate_charset():
    import json
    body = (b'\xff\xfe{\x00"\x00a\x00"\x00:\x00 \x00"\x00/\x00\\\x00u\x006\x00d\x004\x001\x00'
        b'\\\x00u\x008\x008\x004\x00c\x00\\\x00u\x008\x00d\x008\x00b\x00\\\x00u\x005\x002\x00'
        b'b\x00f\x00"\x00}\x00'
    )
    request = Request.blank('/', POST=body)
    request.content_type = 'application/json; charset=utf-16'
    s = request.json_body['a']
    eq(s.encode('utf8'), b'/\xe6\xb5\x81\xe8\xa1\x8c\xe8\xb6\x8b\xe5\x8a\xbf')

def test_json_body_GET_request():
    request = Request.blank('/')
    assert_raises(ValueError, getattr, request, 'json_body')

def test_non_ascii_body_params():
    body = b'test=%D1%82%D0%B5%D1%81%D1%82'
    req = Request.blank('/', POST=body)
    # acessing params parses request body
    req.params
    # accessing body again makes the POST dict serialize again
    # make sure it can handle the non-ascii characters in the query
    eq(req.body, body)