1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
# -*- coding: windows-1252 -*-
###
#
# Copyright Alan Kennedy.
#
# You may contact the copyright holder at this uri:
#
# http://www.xhaus.com/contact/modjy
#
# The licence under which this code is released is the Apache License v2.0.
#
# The terms and conditions of this license are listed in a file contained
# in the distribution that also contained this file, under the name
# LICENSE.txt.
#
# You may also read a copy of the license at the following web address.
#
# http://modjy.xhaus.com/LICENSE.txt
#
###
"""
A variety of app callables used to test the WSGI streams.
"""
from UserDict import UserDict
def extract_params(qstring):
params = {}
if qstring:
name_vals = [t.split('=', 1) for t in qstring.split('&')]
for n, v in name_vals: params[n] = v
return params
def test_read_input_stream(environ, start_response):
writer = start_response("200 OK", [])
wsgi_input = environ['wsgi.input']
params = extract_params(environ['QUERY_STRING'])
readsize = None
if params.has_key('readsize'):
readsize = int(params['readsize'])
if readsize:
pieces = []
piece = wsgi_input.read(readsize)
while piece:
pieces.append(piece)
piece = wsgi_input.read(readsize)
data = ''.join(pieces)
else:
data = wsgi_input.read()
output_dict = {'data': data}
writer(repr(output_dict))
return []
def test_readline_input_stream(environ, start_response):
writer = start_response("200 OK", [])
wsgi_input = environ['wsgi.input']
params = extract_params(environ['QUERY_STRING'])
readsize = None
if params.has_key('readsize'):
readsize = int(params['readsize'])
if readsize:
data = wsgi_input.readline(readsize)
else:
data = wsgi_input.readline()
output_dict = {'data': data}
writer(repr(output_dict))
return []
def test_readlines_input_stream(environ, start_response):
writer = start_response("200 OK", [])
wsgi_input = environ['wsgi.input']
params = extract_params(environ['QUERY_STRING'])
readsize = None
if params.has_key('readsize'):
readsize = int(params['readsize'])
if readsize:
data = wsgi_input.readlines(readsize)
else:
data = wsgi_input.readlines()
output_dict = {'data': "$".join(data)}
writer(repr(output_dict))
return []
def test_iter_input_stream(environ, start_response):
writer = start_response("200 OK", [])
wsgi_input = environ['wsgi.input']
data = "$".join([line for line in wsgi_input])
output_dict = {'data': data}
writer(repr(output_dict))
return []
def test_error_stream(environ, start_response):
writer = start_response("200 OK", [])
wsgi_errors = environ['wsgi.errors']
error_msg = None
for method in ['flush', 'write', 'writelines', ]:
if not hasattr(wsgi_errors, method):
error_msg = "wsgi.errors has no '%s' attr" % method
if not error_msg and not callable(getattr(wsgi_errors, method)):
error_msg = "wsgi.errors.%s attr is not callable" % method
if error_msg: break
return_msg = error_msg or "success"
writer(return_msg)
return []
|