1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# -*- coding: utf-8 -*-
from __future__ import with_statement
import os
import bottle
import sys
import unittest
import wsgiref
import wsgiref.util
import wsgiref.validate
import warnings
import mimetypes
import uuid
from bottle import tob, tonat, BytesIO, py3k, unicode
def warn(msg):
sys.stderr.write('WARNING: %s\n' % msg.strip())
def tobs(data):
''' Transforms bytes or unicode into a byte stream. '''
return BytesIO(tob(data))
class chdir(object):
def __init__(self, dir):
if os.path.isfile(dir):
dir = os.path.dirname(dir)
self.wd = os.path.abspath(dir)
self.old = os.path.abspath('.')
def __enter__(self):
os.chdir(self.wd)
def __exit__(self, exc_type, exc_val, tb):
os.chdir(self.old)
class assertWarn(object):
def __init__(self, text):
self.searchtext = text
def __call__(self, func):
def wrapper(*a, **ka):
with warnings.catch_warnings(record=True) as wr:
warnings.simplefilter("always")
out = func(*a, **ka)
messages = [repr(w.message) for w in wr]
for msg in messages:
if self.searchtext in msg:
return out
raise AssertionError("Could not find phrase %r in any warning messaged: %r" % (self.searchtext, messages))
return wrapper
def api(introduced, deprecated=None, removed=None):
current = tuple(map(int, bottle.__version__.split('-')[0].split('.')))
introduced = tuple(map(int, introduced.split('.')))
deprecated = tuple(map(int, deprecated.split('.'))) if deprecated else (99,99)
removed = tuple(map(int, removed.split('.'))) if removed else (99,100)
assert introduced < deprecated < removed
def decorator(func):
if current < introduced:
return None
elif current < deprecated:
return func
elif current < removed:
func.__doc__ = '(deprecated) ' + (func.__doc__ or '')
return assertWarn('DeprecationWarning')(func)
else:
return None
return decorator
def wsgistr(s):
if py3k:
return s.encode('utf8').decode('latin1')
else:
return s
class ServerTestBase(unittest.TestCase):
def setUp(self):
''' Create a new Bottle app set it as default_app '''
self.port = 8080
self.host = 'localhost'
self.app = bottle.app.push()
self.wsgiapp = wsgiref.validate.validator(self.app)
def urlopen(self, path, method='GET', post='', env=None):
result = {'code':0, 'status':'error', 'header':{}, 'body':tob('')}
def start_response(status, header, exc_info=None):
result['code'] = int(status.split()[0])
result['status'] = status.split(None, 1)[-1]
for name, value in header:
name = name.title()
if name in result['header']:
result['header'][name] += ', ' + value
else:
result['header'][name] = value
env = env if env else {}
wsgiref.util.setup_testing_defaults(env)
env['REQUEST_METHOD'] = wsgistr(method.upper().strip())
env['PATH_INFO'] = wsgistr(path)
env['QUERY_STRING'] = wsgistr('')
if post:
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_LENGTH'] = str(len(tob(post)))
env['wsgi.input'].write(tob(post))
env['wsgi.input'].seek(0)
response = self.wsgiapp(env, start_response)
for part in response:
try:
result['body'] += part
except TypeError:
raise TypeError('WSGI app yielded non-byte object %s', type(part))
if hasattr(response, 'close'):
response.close()
del response
return result
def postmultipart(self, path, fields, files):
env = multipart_environ(fields, files)
return self.urlopen(path, method='POST', env=env)
def tearDown(self):
bottle.app.pop()
def assertStatus(self, code, route='/', **kargs):
self.assertEqual(code, self.urlopen(route, **kargs)['code'])
def assertBody(self, body, route='/', **kargs):
self.assertEqual(tob(body), self.urlopen(route, **kargs)['body'])
def assertInBody(self, body, route='/', **kargs):
result = self.urlopen(route, **kargs)['body']
if tob(body) not in result:
self.fail('The search pattern "%s" is not included in body:\n%s' % (body, result))
def assertHeader(self, name, value, route='/', **kargs):
self.assertEqual(value, self.urlopen(route, **kargs)['header'].get(name))
def assertHeaderAny(self, name, route='/', **kargs):
self.assertTrue(self.urlopen(route, **kargs)['header'].get(name, None))
def assertInError(self, search, route='/', **kargs):
bottle.request.environ['wsgi.errors'].errors.seek(0)
err = bottle.request.environ['wsgi.errors'].errors.read()
if search not in err:
self.fail('The search pattern "%s" is not included in wsgi.error: %s' % (search, err))
def multipart_environ(fields, files):
boundary = 'lowerUPPER-1234'
env = {'REQUEST_METHOD':'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary='+boundary}
wsgiref.util.setup_testing_defaults(env)
boundary = '--' + boundary
body = ''
for name, value in fields:
body += boundary + '\r\n'
body += 'Content-Disposition: form-data; name="%s"\r\n\r\n' % name
body += value + '\r\n'
for name, filename, content in files:
mimetype = str(mimetypes.guess_type(filename)[0]) or 'application/octet-stream'
body += boundary + '\r\n'
body += 'Content-Disposition: file; name="%s"; filename="%s"\r\n' % \
(name, filename)
body += 'Content-Type: %s\r\n\r\n' % mimetype
body += content + '\r\n'
body += boundary + '--\r\n'
if isinstance(body, unicode):
body = body.encode('utf8')
env['CONTENT_LENGTH'] = str(len(body))
env['wsgi.input'].write(body)
env['wsgi.input'].seek(0)
return env
|