1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
import asyncio
import multiprocessing
import socket
# Third Party
import aiohttp
import aiohttp.web
import pytest
from aiohttp.web import StreamResponse
from moto.server import ThreadedMotoServer
_proxy_bypass = {
"http": None,
"https": None,
}
host = '127.0.0.1'
def get_free_tcp_port(release_socket: bool = False):
sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sckt.bind((host, 0))
addr, port = sckt.getsockname()
if release_socket:
sckt.close()
return port
return sckt, port
# This runs in a subprocess for a variety of reasons
# 1) early versions of python 3.5 did not correctly set one thread per run loop
# 2) aiohttp uses get_event_loop instead of using the passed in run loop
# 3) aiohttp shutdown can be hairy
class AIOServer(multiprocessing.Process):
"""
This is a mock AWS service which will 5 seconds before returning
a response to test socket timeouts.
"""
def __init__(self):
super().__init__(target=self._run)
self._loop = None
self._port = get_free_tcp_port(True)
self.endpoint_url = f'http://{host}:{self._port}'
self.daemon = True # die when parent dies
def _run(self):
asyncio.set_event_loop(asyncio.new_event_loop())
app = aiohttp.web.Application()
app.router.add_route('*', '/ok', self.ok)
app.router.add_route('*', '/{anything:.*}', self.stream_handler)
try:
aiohttp.web.run_app(
app, host=host, port=self._port, handle_signals=False
)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise
async def __aenter__(self):
self.start()
await self._wait_until_up()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
self.terminate()
except BaseException:
pytest.fail("Unable to shut down server")
raise
@staticmethod
async def ok(request):
return aiohttp.web.Response()
async def stream_handler(self, request):
# Without the Content-Type, most (all?) browsers will not render
# partially downloaded content. Note, the response type is
# StreamResponse not Response.
resp = StreamResponse(
status=200, reason='OK', headers={'Content-Type': 'text/html'}
)
await resp.prepare(request)
await asyncio.sleep(5)
await resp.drain()
return resp
async def _wait_until_up(self):
async with aiohttp.ClientSession() as session:
for i in range(0, 30):
if self.exitcode is not None:
pytest.fail('unable to start/connect to aiohttp server')
return
try:
# we need to bypass the proxies due to monkey patches
await session.get(self.endpoint_url + '/ok', timeout=0.5)
return
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
await asyncio.sleep(0.5)
except BaseException:
pytest.fail('unable to start/connect to aiohttp server')
raise
pytest.fail('unable to start and connect to aiohttp server')
@pytest.fixture
async def moto_server(server_scheme):
server = ThreadedMotoServer(port=0)
try:
server.start()
host, port = server.get_host_and_port()
yield f'http://{host}:{port}'
finally:
server.stop()
|