File: test_fixture_nursery.py

package info (click to toggle)
python-pytest-trio 0.8.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 464 kB
  • sloc: python: 944; sh: 49; makefile: 20
file content (22 lines) | stat: -rw-r--r-- 515 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pytest
import trio


async def handle_client(stream):
    while True:
        buff = await stream.receive_some(4)
        await stream.send_all(buff)


@pytest.fixture
async def server(nursery):
    listeners = await nursery.start(trio.serve_tcp, handle_client, 0)
    return listeners[0]


@pytest.mark.trio
async def test_try(server):
    stream = await trio.testing.open_stream_to_socket_listener(server)
    await stream.send_all(b"ping")
    rep = await stream.receive_some(4)
    assert rep == b"ping"