1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
"""A library of helper functions for the Cheroot test suite."""
import datetime
import logging
import os
import sys
import time
import threading
import types
import http.client
import cheroot.server
import cheroot.wsgi
from cheroot.test import webtest
log = logging.getLogger(__name__)
thisdir = os.path.abspath(os.path.dirname(__file__))
config = {
'bind_addr': ('127.0.0.1', 54583),
'server': 'wsgi',
'wsgi_app': None,
}
class CherootWebCase(webtest.WebCase):
"""Helper class for a web app test suite."""
script_name = ''
scheme = 'http'
available_servers = {
'wsgi': cheroot.wsgi.Server,
'native': cheroot.server.HTTPServer,
}
@classmethod
def setup_class(cls):
"""Create and run one HTTP server per class."""
conf = config.copy()
conf.update(getattr(cls, 'config', {}))
s_class = conf.pop('server', 'wsgi')
server_factory = cls.available_servers.get(s_class)
if server_factory is None:
raise RuntimeError('Unknown server in config: %s' % conf['server'])
cls.httpserver = server_factory(**conf)
cls.HOST, cls.PORT = cls.httpserver.bind_addr
if cls.httpserver.ssl_adapter is None:
ssl = ''
cls.scheme = 'http'
else:
ssl = ' (ssl)'
cls.HTTP_CONN = http.client.HTTPSConnection
cls.scheme = 'https'
v = sys.version.split()[0]
log.info('Python version used to run this test script: %s', v)
log.info('Cheroot version: %s', cheroot.__version__)
log.info('HTTP server version: %s%s', cls.httpserver.protocol, ssl)
log.info('PID: %s', os.getpid())
if hasattr(cls, 'setup_server'):
# Clear the wsgi server so that
# it can be updated with the new root
cls.setup_server()
cls.start()
@classmethod
def teardown_class(cls):
"""Cleanup HTTP server."""
if hasattr(cls, 'setup_server'):
cls.stop()
@classmethod
def start(cls):
"""Load and start the HTTP server."""
threading.Thread(target=cls.httpserver.safe_start).start()
while not cls.httpserver.ready:
time.sleep(0.1)
@classmethod
def stop(cls):
"""Terminate HTTP server."""
cls.httpserver.stop()
td = getattr(cls, 'teardown', None)
if td:
td()
date_tolerance = 2
def assertEqualDates(self, dt1, dt2, seconds=None):
"""Assert ``abs(dt1 - dt2)`` is within ``Y`` seconds."""
if seconds is None:
seconds = self.date_tolerance
if dt1 > dt2:
diff = dt1 - dt2
else:
diff = dt2 - dt1
if not diff < datetime.timedelta(seconds=seconds):
raise AssertionError(
'%r and %r are not within %r seconds.' %
(dt1, dt2, seconds),
)
class Request:
"""HTTP request container."""
def __init__(self, environ):
"""Initialize HTTP request."""
self.environ = environ
class Response:
"""HTTP response container."""
def __init__(self):
"""Initialize HTTP response."""
self.status = '200 OK'
self.headers = {'Content-Type': 'text/html'}
self.body = None
def output(self):
"""Generate iterable response body object."""
if self.body is None:
return []
elif isinstance(self.body, str):
return [self.body.encode('iso-8859-1')]
elif isinstance(self.body, bytes):
return [self.body]
else:
return [x.encode('iso-8859-1') for x in self.body]
class Controller:
"""WSGI app for tests."""
def __call__(self, environ, start_response):
"""WSGI request handler."""
req, resp = Request(environ), Response()
try:
# Python 3 supports unicode attribute names
# Python 2 encodes them
handler = self.handlers[environ['PATH_INFO']]
except KeyError:
resp.status = '404 Not Found'
else:
output = handler(req, resp)
if (
output is not None
and not any(
resp.status.startswith(status_code)
for status_code in ('204', '304')
)
):
resp.body = output
try:
resp.headers.setdefault('Content-Length', str(len(output)))
except TypeError:
if not isinstance(output, types.GeneratorType):
raise
start_response(resp.status, resp.headers.items())
return resp.output()
|