1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
# -*- coding: utf-8 -*-
import threading
import socket
import select
def consume_socket_content(sock, timeout=0.5):
chunks = 65536
content = b''
while True:
more_to_read = select.select([sock], [], [], timeout)[0]
if not more_to_read:
break
new_content = sock.recv(chunks)
if not new_content:
break
content += new_content
return content
class Server(threading.Thread):
"""Dummy server using for unit testing"""
WAIT_EVENT_TIMEOUT = 5
def __init__(self, handler=None, host='localhost', port=0, requests_to_handle=1, wait_to_close_event=None):
super(Server, self).__init__()
self.handler = handler or consume_socket_content
self.handler_results = []
self.host = host
self.port = port
self.requests_to_handle = requests_to_handle
self.wait_to_close_event = wait_to_close_event
self.ready_event = threading.Event()
self.stop_event = threading.Event()
@classmethod
def text_response_server(cls, text, request_timeout=0.5, **kwargs):
def text_response_handler(sock):
request_content = consume_socket_content(sock, timeout=request_timeout)
sock.send(text.encode('utf-8'))
return request_content
return Server(text_response_handler, **kwargs)
@classmethod
def basic_response_server(cls, **kwargs):
return cls.text_response_server(
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n\r\n",
**kwargs
)
def run(self):
try:
self.server_sock = self._create_socket_and_bind()
# in case self.port = 0
self.port = self.server_sock.getsockname()[1]
self.ready_event.set()
self._handle_requests()
if self.wait_to_close_event:
self.wait_to_close_event.wait(self.WAIT_EVENT_TIMEOUT)
finally:
self.ready_event.set() # just in case of exception
self._close_server_sock_ignore_errors()
self.stop_event.set()
def _create_socket_and_bind(self):
sock = socket.socket()
sock.bind((self.host, self.port))
sock.listen(0)
return sock
def _close_server_sock_ignore_errors(self):
try:
self.server_sock.close()
except IOError:
pass
def _handle_requests(self):
for _ in range(self.requests_to_handle):
sock = self._accept_connection()
if not sock:
break
handler_result = self.handler(sock)
self.handler_results.append(handler_result)
sock.close()
def _accept_connection(self):
try:
ready, _, _ = select.select([self.server_sock], [], [], self.WAIT_EVENT_TIMEOUT)
if not ready:
return None
return self.server_sock.accept()[0]
except (select.error, socket.error):
return None
def __enter__(self):
self.start()
self.ready_event.wait(self.WAIT_EVENT_TIMEOUT)
return self.host, self.port
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
self.stop_event.wait(self.WAIT_EVENT_TIMEOUT)
else:
if self.wait_to_close_event:
# avoid server from waiting for event timeouts
# if an exception is found in the main thread
self.wait_to_close_event.set()
# ensure server thread doesn't get stuck waiting for connections
self._close_server_sock_ignore_errors()
self.join()
return False # allow exceptions to propagate
|