1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
import asyncio
import multiprocessing
# Third Party
import aiohttp
import aiohttp.web
import pytest
from aiohttp.web import StreamResponse
# aiobotocore
from tests.moto_server import MotoService, get_free_tcp_port, host
_proxy_bypass = {
"http": None,
"https": None,
}
# This runs in a subprocess for a variety of reasons
# 1) early versions of python 3.5 did not correctly set one thread per run loop
# 2) aiohttp uses get_event_loop instead of using the passed in run loop
# 3) aiohttp shutdown can be hairy
class AIOServer(multiprocessing.Process):
"""
This is a mock AWS service which will 5 seconds before returning
a response to test socket timeouts.
"""
def __init__(self):
super().__init__(target=self._run)
self._loop = None
self._port = get_free_tcp_port(True)
self.endpoint_url = f'http://{host}:{self._port}'
self.daemon = True # die when parent dies
def _run(self):
asyncio.set_event_loop(asyncio.new_event_loop())
app = aiohttp.web.Application()
app.router.add_route('*', '/ok', self.ok)
app.router.add_route('*', '/{anything:.*}', self.stream_handler)
try:
aiohttp.web.run_app(
app, host=host, port=self._port, handle_signals=False
)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise
async def __aenter__(self):
self.start()
await self._wait_until_up()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
self.terminate()
except BaseException:
pytest.fail("Unable to shut down server")
raise
@staticmethod
async def ok(request):
return aiohttp.web.Response()
async def stream_handler(self, request):
# Without the Content-Type, most (all?) browsers will not render
# partially downloaded content. Note, the response type is
# StreamResponse not Response.
resp = StreamResponse(
status=200, reason='OK', headers={'Content-Type': 'text/html'}
)
await resp.prepare(request)
await asyncio.sleep(5)
await resp.drain()
return resp
async def _wait_until_up(self):
async with aiohttp.ClientSession() as session:
for i in range(0, 30):
if self.exitcode is not None:
pytest.fail('unable to start/connect to aiohttp server')
return
try:
# we need to bypass the proxies due to monkey patches
await session.get(self.endpoint_url + '/ok', timeout=0.5)
return
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
await asyncio.sleep(0.5)
except BaseException:
pytest.fail('unable to start/connect to aiohttp server')
raise
pytest.fail('unable to start and connect to aiohttp server')
@pytest.fixture
async def s3_server(server_scheme):
async with MotoService('s3', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def dynamodb2_server(server_scheme):
async with MotoService('dynamodb', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def cloudformation_server(server_scheme):
async with MotoService(
'cloudformation', ssl=server_scheme == 'https'
) as svc:
yield svc.endpoint_url
@pytest.fixture
async def sns_server(server_scheme):
async with MotoService('sns', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def sqs_server(server_scheme):
async with MotoService('sqs', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def batch_server(server_scheme):
async with MotoService('batch', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def lambda_server(server_scheme):
async with MotoService('lambda', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def iam_server(server_scheme):
async with MotoService('iam', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def rds_server(server_scheme):
async with MotoService('rds', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def ec2_server(server_scheme):
async with MotoService('ec2', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
@pytest.fixture
async def kinesis_server(server_scheme):
async with MotoService('kinesis', ssl=server_scheme == 'https') as svc:
yield svc.endpoint_url
|