1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# -*- coding: utf-8 -*-
import bottle
import threading
import urllib
import urllib2
import sys
import time
import unittest
import wsgiref
import wsgiref.simple_server
import wsgiref.util
import wsgiref.validate
import warnings
from StringIO import StringIO
try:
from io import BytesIO
except:
BytesIO = None
pass
import mimetypes
import uuid
def tob(data):
''' Transforms bytes or unicode into bytes. '''
return data.encode('utf8') if isinstance(data, unicode) else data
def tobs(data):
''' Transforms bytes or unicode into a byte stream. '''
return BytesIO(tob(data)) if BytesIO else StringIO(tob(data))
def warn(message):
warnings.warn(message, stacklevel=2)
class ServerTestBase(unittest.TestCase):
def setUp(self):
''' Create a new Bottle app set it as default_app and register it to urllib2 '''
self.port = 8080
self.host = 'localhost'
self.app = bottle.app.push()
self.wsgiapp = wsgiref.validate.validator(self.app)
def urlopen(self, path, method='GET', post='', env=None):
result = {'code':0, 'status':'error', 'header':{}, 'body':tob('')}
def start_response(status, header):
result['code'] = int(status.split()[0])
result['status'] = status.split(None, 1)[-1]
for name, value in header:
name = name.title()
if name in result['header']:
result['header'][name] += ', ' + value
else:
result['header'][name] = value
env = env if env else {}
wsgiref.util.setup_testing_defaults(env)
env['REQUEST_METHOD'] = method.upper().strip()
env['PATH_INFO'] = path
env['QUERY_STRING'] = ''
if post:
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_LENGTH'] = str(len(tob(post)))
env['wsgi.input'].write(tob(post))
env['wsgi.input'].seek(0)
response = self.wsgiapp(env, start_response)
for part in response:
try:
result['body'] += part
except TypeError:
raise TypeError('WSGI app yielded non-byte object %s', type(part))
if hasattr(response, 'close'):
response.close()
del response
return result
def postmultipart(self, path, fields, files):
env = multipart_environ(fields, files)
return self.urlopen(path, method='POST', env=env)
def tearDown(self):
bottle.app.pop()
def assertStatus(self, code, route='/', **kargs):
self.assertEqual(code, self.urlopen(route, **kargs)['code'])
def assertBody(self, body, route='/', **kargs):
self.assertEqual(tob(body), self.urlopen(route, **kargs)['body'])
def assertInBody(self, body, route='/', **kargs):
result = self.urlopen(route, **kargs)['body']
if tob(body) not in result:
self.fail('The search pattern "%s" is not included in body:\n%s' % (body, result))
def assertHeader(self, name, value, route='/', **kargs):
self.assertEqual(value, self.urlopen(route, **kargs)['header'].get(name))
def assertHeaderAny(self, name, route='/', **kargs):
self.assertTrue(self.urlopen(route, **kargs)['header'].get(name, None))
def assertInError(self, search, route='/', **kargs):
bottle.request.environ['wsgi.errors'].errors.seek(0)
err = bottle.request.environ['wsgi.errors'].errors.read()
if search not in err:
self.fail('The search pattern "%s" is not included in wsgi.error: %s' % (search, err))
def multipart_environ(fields, files):
boundary = str(uuid.uuid1())
env = {'REQUEST_METHOD':'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary='+boundary}
wsgiref.util.setup_testing_defaults(env)
boundary = '--' + boundary
body = ''
for name, value in fields:
body += boundary + '\n'
body += 'Content-Disposition: form-data; name="%s"\n\n' % name
body += value + '\n'
for name, filename, content in files:
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
body += boundary + '\n'
body += 'Content-Disposition: file; name="%s"; filename="%s"\n' % \
(name, filename)
body += 'Content-Type: %s\n\n' % mimetype
body += content + '\n'
body += boundary + '--\n'
if hasattr(body, 'encode'):
body = body.encode('utf8')
env['CONTENT_LENGTH'] = str(len(body))
env['wsgi.input'].write(body)
env['wsgi.input'].seek(0)
return env
|