File: test_asgi.py

package info (click to toggle)
python-quart-trio 0.12.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 172 kB
  • sloc: python: 984; makefile: 2
file content (35 lines) | stat: -rw-r--r-- 1,231 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pytest
import trio
from hypercorn.typing import WebsocketScope

from quart_trio.app import QuartTrio
from quart_trio.asgi import TrioASGIWebsocketConnection


@pytest.mark.trio
async def test_websocket_complete_on_disconnect() -> None:
    scope: WebsocketScope = {
        "type": "websocket",
        "asgi": {},
        "http_version": "1.1",
        "scheme": "wss",
        "path": "ws://quart/path",
        "raw_path": b"/",
        "query_string": b"",
        "root_path": "",
        "headers": [(b"host", b"quart")],
        "client": ("127.0.0.1", 80),
        "server": None,
        "subprotocols": [],
        "extensions": {"websocket.http.response": {}},
        "state": {},  # type: ignore[typeddict-item]
    }
    connection = TrioASGIWebsocketConnection(QuartTrio(__name__), scope)
    send_channel, receive_channel = trio.open_memory_channel[dict](0)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(
            connection.handle_messages, nursery, receive_channel.receive  # type: ignore
        )
        await send_channel.send({"type": "websocket.disconnect"})
        await trio.sleep(1)  # Simulate doing something else
    assert nursery.cancel_scope.cancelled_caught