1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import socket
import threading
from django.core.handlers.wsgi import WSGIHandler
from django.core.servers import basehttp
from django.test.testcases import TransactionTestCase
from django.core.management import call_command
class StoppableWSGIServer(basehttp.WSGIServer):
"""WSGIServer with short timeout, so that server thread can stop this server."""
def server_bind(self):
"""Sets timeout to 1 second."""
basehttp.WSGIServer.server_bind(self)
self.socket.settimeout(1)
def get_request(self):
"""Checks for timeout when getting request."""
try:
sock, address = self.socket.accept()
sock.settimeout(None)
return (sock, address)
except socket.timeout:
raise
class TestServerThread(threading.Thread):
"""Thread for running a http server while tests are running."""
def __init__(self, address, port):
self.address = address
self.port = port
self._stopevent = threading.Event()
self.started = threading.Event()
self.error = None
super(TestServerThread, self).__init__()
def run(self):
"""Sets up test server and database and loops over handling http requests."""
try:
handler = basehttp.AdminMediaHandler(WSGIHandler())
server_address = (self.address, self.port)
httpd = StoppableWSGIServer(server_address, basehttp.WSGIRequestHandler)
httpd.set_app(handler)
self.started.set()
except basehttp.WSGIServerException, e:
self.error = e
self.started.set()
return
# Must do database stuff in this new thread if database in memory.
from django.conf import settings
if settings.DATABASE_ENGINE == 'sqlite3' \
and (not settings.TEST_DATABASE_NAME or settings.TEST_DATABASE_NAME == ':memory:'):
# Import the fixture data into the test database.
if hasattr(self, 'fixtures'):
# We have to use this slightly awkward syntax due to the fact
# that we're using *args and **kwargs together.
call_command('loaddata', *self.fixtures, **{'verbosity': 0})
# Loop until we get a stop event.
while not self._stopevent.isSet():
httpd.handle_request()
def join(self, timeout=None):
"""Stop the thread and wait for it to finish."""
self._stopevent.set()
threading.Thread.join(self, timeout)
class TestServerTestCase(TransactionTestCase):
def start_test_server(self, address='localhost', port=8000):
"""Creates a live test server object (instance of WSGIServer)."""
self.server_thread = TestServerThread(address, port)
self.server_thread.start()
self.server_thread.started.wait()
if self.server_thread.error:
raise self.server_thread.error
def stop_test_server(self):
if self.server_thread:
self.server_thread.join()
|