1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
import asyncio
import pytest
from grpclib.server import Server
async def serve_forever(server):
await server.start('127.0.0.1')
await server.wait_closed()
@pytest.mark.asyncio
async def test_wait_closed(loop: asyncio.AbstractEventLoop):
server = Server([])
task = loop.create_task(serve_forever(server))
done, pending = await asyncio.wait([task], timeout=0.1)
assert pending and not done
server.close()
done, pending = await asyncio.wait([task], timeout=0.1)
assert done and not pending
@pytest.mark.asyncio
async def test_close_twice():
server = Server([])
await server.start('127.0.0.1')
server.close()
server.close()
await server.wait_closed()
|