File: mock_server.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (164 lines) | stat: -rw-r--r-- 4,897 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import asyncio
import multiprocessing

# Third Party
import aiohttp
import aiohttp.web
import pytest
from aiohttp.web import StreamResponse

# aiobotocore
from tests.moto_server import MotoService, get_free_tcp_port, host

_proxy_bypass = {
    "http": None,
    "https": None,
}


# This runs in a subprocess for a variety of reasons
# 1) early versions of python 3.5 did not correctly set one thread per run loop
# 2) aiohttp uses get_event_loop instead of using the passed in run loop
# 3) aiohttp shutdown can be hairy
class AIOServer(multiprocessing.Process):
    """
    This is a mock AWS service which will 5 seconds before returning
    a response to test socket timeouts.
    """

    def __init__(self):
        super().__init__(target=self._run)
        self._loop = None
        self._port = get_free_tcp_port(True)
        self.endpoint_url = f'http://{host}:{self._port}'
        self.daemon = True  # die when parent dies

    def _run(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        app = aiohttp.web.Application()
        app.router.add_route('*', '/ok', self.ok)
        app.router.add_route('*', '/{anything:.*}', self.stream_handler)

        try:
            aiohttp.web.run_app(
                app, host=host, port=self._port, handle_signals=False
            )
        except BaseException:
            pytest.fail('unable to start and connect to aiohttp server')
            raise

    async def __aenter__(self):
        self.start()
        await self._wait_until_up()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            self.terminate()
        except BaseException:
            pytest.fail("Unable to shut down server")
            raise

    @staticmethod
    async def ok(request):
        return aiohttp.web.Response()

    async def stream_handler(self, request):
        # Without the Content-Type, most (all?) browsers will not render
        # partially downloaded content. Note, the response type is
        # StreamResponse not Response.
        resp = StreamResponse(
            status=200, reason='OK', headers={'Content-Type': 'text/html'}
        )

        await resp.prepare(request)
        await asyncio.sleep(5)
        await resp.drain()
        return resp

    async def _wait_until_up(self):
        async with aiohttp.ClientSession() as session:
            for i in range(0, 30):
                if self.exitcode is not None:
                    pytest.fail('unable to start/connect to aiohttp server')
                    return

                try:
                    # we need to bypass the proxies due to monkey patches
                    await session.get(self.endpoint_url + '/ok', timeout=0.5)
                    return
                except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
                    await asyncio.sleep(0.5)
                except BaseException:
                    pytest.fail('unable to start/connect to aiohttp server')
                    raise

        pytest.fail('unable to start and connect to aiohttp server')


@pytest.fixture
async def s3_server(server_scheme):
    async with MotoService('s3', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def dynamodb2_server(server_scheme):
    async with MotoService('dynamodb', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def cloudformation_server(server_scheme):
    async with MotoService(
        'cloudformation', ssl=server_scheme == 'https'
    ) as svc:
        yield svc.endpoint_url


@pytest.fixture
async def sns_server(server_scheme):
    async with MotoService('sns', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def sqs_server(server_scheme):
    async with MotoService('sqs', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def batch_server(server_scheme):
    async with MotoService('batch', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def lambda_server(server_scheme):
    async with MotoService('lambda', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def iam_server(server_scheme):
    async with MotoService('iam', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def rds_server(server_scheme):
    async with MotoService('rds', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def ec2_server(server_scheme):
    async with MotoService('ec2', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url


@pytest.fixture
async def kinesis_server(server_scheme):
    async with MotoService('kinesis', ssl=server_scheme == 'https') as svc:
        yield svc.endpoint_url