1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import threading
import time
from socketserver import ThreadingMixIn
from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler
import requests
import socketio
class SocketIOWebServer:
"""A simple web server used for running Socket.IO servers in tests.
:param sio: a Socket.IO server instance.
Note 1: This class is not production-ready and is intended for testing.
Note 2: This class only supports the "threading" async_mode, with WebSocket
support provided by the simple-websocket package.
"""
def __init__(self, sio):
if sio.async_mode != 'threading':
raise ValueError('The async_mode must be "threading"')
def http_app(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'OK']
self.sio = sio
self.app = socketio.WSGIApp(sio, http_app)
self.httpd = None
self.thread = None
def start(self, port=8900):
"""Start the web server.
:param port: the port to listen on. Defaults to 8900.
The server is started in a background thread.
"""
class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
pass
class WebSocketRequestHandler(WSGIRequestHandler):
def get_environ(self):
env = super().get_environ()
# pass the raw socket to the WSGI app so that it can be used
# by WebSocket connections (hack copied from gunicorn)
env['gunicorn.socket'] = self.connection
return env
self.httpd = make_server('', port, self._app_wrapper,
ThreadingWSGIServer, WebSocketRequestHandler)
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.start()
# wait for the server to start
while True:
try:
r = requests.get(f'http://localhost:{port}/')
r.raise_for_status()
if r.text == 'OK':
break
except:
time.sleep(0.1)
def stop(self):
"""Stop the web server."""
self.sio.shutdown()
self.httpd.shutdown()
self.httpd.server_close()
self.thread.join()
self.httpd = None
self.thread = None
def _app_wrapper(self, environ, start_response):
try:
return self.app(environ, start_response)
except StopIteration:
# end the WebSocket request without sending a response
# (this is a hack that was copied from gunicorn's threaded worker)
start_response('200 OK', [])
return []
|