File: test_server.py

package info (click to toggle)
python-grpclib 0.4.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 676 kB
  • sloc: python: 6,864; makefile: 2
file content (30 lines) | stat: -rw-r--r-- 711 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

import pytest

from grpclib.server import Server


async def serve_forever(server):
    await server.start('127.0.0.1')
    await server.wait_closed()


@pytest.mark.asyncio
async def test_wait_closed(loop: asyncio.AbstractEventLoop):
    server = Server([])
    task = loop.create_task(serve_forever(server))
    done, pending = await asyncio.wait([task], timeout=0.1)
    assert pending and not done
    server.close()
    done, pending = await asyncio.wait([task], timeout=0.1)
    assert done and not pending


@pytest.mark.asyncio
async def test_close_twice():
    server = Server([])
    await server.start('127.0.0.1')
    server.close()
    server.close()
    await server.wait_closed()