File: test_testing.py

package info (click to toggle)
python-asgiref 3.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 428 kB
  • sloc: python: 2,635; makefile: 19
file content (90 lines) | stat: -rw-r--r-- 2,795 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio

import pytest

from asgiref.testing import ApplicationCommunicator
from asgiref.wsgi import WsgiToAsgi


@pytest.mark.asyncio
async def test_receive_nothing():
    """
    Tests ApplicationCommunicator.receive_nothing to return the correct value.
    """
    # Get an ApplicationCommunicator instance
    def wsgi_application(environ, start_response):
        start_response("200 OK", [])
        yield b"content"

    application = WsgiToAsgi(wsgi_application)
    instance = ApplicationCommunicator(
        application,
        {
            "type": "http",
            "http_version": "1.0",
            "method": "GET",
            "path": "/foo/",
            "query_string": b"bar=baz",
            "headers": [],
        },
    )

    # No event
    assert await instance.receive_nothing() is True

    # Produce 3 events to receive
    await instance.send_input({"type": "http.request"})
    # Start event of the response
    assert await instance.receive_nothing() is False
    await instance.receive_output()
    # First body event of the response announcing further body event
    assert await instance.receive_nothing() is False
    await instance.receive_output()
    # Last body event of the response
    assert await instance.receive_nothing() is False
    await instance.receive_output()
    # Response received completely
    assert await instance.receive_nothing(0.01) is True


def test_receive_nothing_lazy_loop():
    """
    Tests ApplicationCommunicator.receive_nothing to return the correct value.
    """
    # Get an ApplicationCommunicator instance
    def wsgi_application(environ, start_response):
        start_response("200 OK", [])
        yield b"content"

    application = WsgiToAsgi(wsgi_application)
    instance = ApplicationCommunicator(
        application,
        {
            "type": "http",
            "http_version": "1.0",
            "method": "GET",
            "path": "/foo/",
            "query_string": b"bar=baz",
            "headers": [],
        },
    )

    async def test():
        # No event
        assert await instance.receive_nothing() is True

        # Produce 3 events to receive
        await instance.send_input({"type": "http.request"})
        # Start event of the response
        assert await instance.receive_nothing() is False
        await instance.receive_output()
        # First body event of the response announcing further body event
        assert await instance.receive_nothing() is False
        await instance.receive_output()
        # Last body event of the response
        assert await instance.receive_nothing() is False
        await instance.receive_output()
        # Response received completely
        assert await instance.receive_nothing(0.01) is True

    asyncio.run(test())