1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
from contextlib import contextmanager
import gevent.pywsgi
import gevent.queue
import gevent.server
TEST_HOST = "127.0.0.1"
TEST_PORT = 54323
LISTENER = TEST_HOST, TEST_PORT
LISTENER_URL = f"http://{TEST_HOST}:{TEST_PORT}/"
HTTPBIN_HOST = "httpbingo.org" # this might be exchanged with a self-hosted version
@contextmanager
def server(handler):
exception_queue = gevent.queue.Queue()
def wrapped_handler(env, start_response):
try:
return handler(env, start_response)
except Exception as e:
exception_queue.put(e)
raise
server = gevent.server.StreamServer(LISTENER, handle=wrapped_handler)
server.start()
try:
yield
if not exception_queue.empty():
raise exception_queue.get()
finally:
server.stop()
@contextmanager
def wsgiserver(handler):
exception_queue = gevent.queue.Queue()
def wrapped_handler(env, start_response):
try:
return handler(env, start_response)
except Exception as e:
exception_queue.put(e)
raise
server = gevent.pywsgi.WSGIServer(LISTENER, wrapped_handler)
server.start()
try:
yield
if not exception_queue.empty():
raise exception_queue.get()
finally:
server.stop()
def check_upload(body, headers=None, length=None):
def wsgi_handler(env, start_response):
assert body == env["wsgi.input"].read()
assert env["REQUEST_METHOD"] == "POST"
if length is not None:
assert int(env.get("CONTENT_LENGTH")) == length
assert len(body) == length
if headers:
for field, val in headers.items():
env_key = field.upper().replace("-", "_")
assert env[env_key] == val
if env_key == "CONTENT_LENGTH":
assert len(body) == int(val)
start_response("200 OK", [])
return []
return wsgi_handler
|