1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
try:
import aiohttp
import aiohttp.web
except ImportError:
skip_tests = True
else:
skip_tests = False
import asyncio
import sys
import unittest
import weakref
from uvloop import _testbase as tb
class _TestAioHTTP:
def test_aiohttp_basic_1(self):
PAYLOAD = '<h1>It Works!</h1>' * 10000
async def on_request(request):
return aiohttp.web.Response(text=PAYLOAD)
asyncio.set_event_loop(self.loop)
app = aiohttp.web.Application()
app.router.add_get('/', on_request)
runner = aiohttp.web.AppRunner(app)
self.loop.run_until_complete(runner.setup())
if tb.has_IPv6 and not tb.has_IPv4:
host = '::'
url = '[::1]'
else:
host = '0.0.0.0'
url = '127.0.0.1'
site = aiohttp.web.TCPSite(runner, host, '0')
self.loop.run_until_complete(site.start())
port = site._server.sockets[0].getsockname()[1]
async def test():
# Make sure we're using the correct event loop.
self.assertIs(asyncio.get_event_loop(), self.loop)
async with aiohttp.ClientSession() as client:
async with client.get('http://{}:{}'.format(url, port)) as r:
self.assertEqual(r.status, 200)
result = await r.text()
self.assertEqual(result, PAYLOAD)
self.loop.run_until_complete(test())
self.loop.run_until_complete(runner.cleanup())
def test_aiohttp_graceful_shutdown(self):
if self.implementation == 'asyncio' and sys.version_info >= (3, 12, 0):
# In Python 3.12.0, asyncio.Server.wait_closed() waits for all
# existing connections to complete, before aiohttp sends
# on_shutdown signals.
# https://github.com/aio-libs/aiohttp/issues/7675#issuecomment-1752143748
# https://github.com/python/cpython/pull/98582
raise unittest.SkipTest('bug in aiohttp: #7675')
async def websocket_handler(request):
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
try:
async for msg in ws:
await ws.send_str(msg.data)
finally:
request.app['websockets'].discard(ws)
return ws
async def on_shutdown(app):
for ws in set(app['websockets']):
await ws.close(
code=aiohttp.WSCloseCode.GOING_AWAY,
message='Server shutdown')
asyncio.set_event_loop(self.loop)
app = aiohttp.web.Application()
app.router.add_get('/', websocket_handler)
app.on_shutdown.append(on_shutdown)
app['websockets'] = weakref.WeakSet()
runner = aiohttp.web.AppRunner(app)
self.loop.run_until_complete(runner.setup())
if tb.has_IPv6 and not tb.has_IPv4:
host = '::'
url = '[::1]'
else:
host = '0.0.0.0'
url = '127.0.0.1'
site = aiohttp.web.TCPSite(
runner,
host,
0,
# https://github.com/aio-libs/aiohttp/pull/7188
shutdown_timeout=0.1,
)
self.loop.run_until_complete(site.start())
port = site._server.sockets[0].getsockname()[1]
async def client():
async with aiohttp.ClientSession() as client:
async with client.ws_connect(
'http://{}:{}'.format(url, port)) as ws:
await ws.send_str("hello")
async for msg in ws:
assert msg.data == "hello"
client_task = asyncio.ensure_future(client())
async def stop():
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(runner.cleanup(), timeout=0.5)
finally:
try:
client_task.cancel()
await client_task
except asyncio.CancelledError:
pass
self.loop.run_until_complete(stop())
@unittest.skipIf(skip_tests, "no aiohttp module")
class Test_UV_AioHTTP(_TestAioHTTP, tb.UVTestCase):
pass
@unittest.skipIf(skip_tests, "no aiohttp module")
class Test_AIO_AioHTTP(_TestAioHTTP, tb.AIOTestCase):
pass
|