1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import asyncio
import json
import time
import pytest
from channels.generic.http import AsyncHttpConsumer
from channels.testing import HttpCommunicator
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_http_consumer():
"""
Tests that AsyncHttpConsumer is implemented correctly.
"""
class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
data = json.loads(body.decode("utf-8"))
await self.send_response(
200,
json.dumps({"value": data["value"]}).encode("utf-8"),
headers={b"Content-Type": b"application/json"},
)
app = TestConsumer()
# Open a connection
communicator = HttpCommunicator(
app,
method="POST",
path="/test/",
body=json.dumps({"value": 42, "anything": False}).encode("utf-8"),
)
response = await communicator.get_response()
assert response["body"] == b'{"value": 42}'
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"application/json")]
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_error():
class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
raise AssertionError("Error correctly raised")
communicator = HttpCommunicator(TestConsumer(), "GET", "/")
with pytest.raises(AssertionError) as excinfo:
await communicator.get_response(timeout=0.05)
assert str(excinfo.value) == "Error correctly raised"
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_per_scope_consumers():
"""
Tests that a distinct consumer is used per scope, with AsyncHttpConsumer as
the example consumer class.
"""
class TestConsumer(AsyncHttpConsumer):
def __init__(self):
super().__init__()
self.time = time.time()
async def handle(self, body):
body = f"{self.__class__.__name__} {id(self)} {self.time}"
await self.send_response(
200,
body.encode("utf-8"),
headers={b"Content-Type": b"text/plain"},
)
app = TestConsumer.as_asgi()
# Open a connection
communicator = HttpCommunicator(app, method="GET", path="/test/")
response = await communicator.get_response()
assert response["status"] == 200
# And another one.
communicator = HttpCommunicator(app, method="GET", path="/test2/")
second_response = await communicator.get_response()
assert second_response["status"] == 200
assert response["body"] != second_response["body"]
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_http_consumer_future():
"""
Regression test for channels accepting only coroutines. The ASGI specification
states that the `receive` and `send` arguments to an ASGI application should be
"awaitable callable" objects. That includes non-coroutine functions that return
Futures.
"""
class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_response(
200,
b"42",
headers={b"Content-Type": b"text/plain"},
)
app = TestConsumer()
# Ensure the passed functions are specifically coroutines.
async def coroutine_app(scope, receive, send):
async def receive_coroutine():
return await asyncio.ensure_future(receive())
async def send_coroutine(*args, **kwargs):
return await asyncio.ensure_future(send(*args, **kwargs))
await app(scope, receive_coroutine, send_coroutine)
communicator = HttpCommunicator(coroutine_app, method="GET", path="/")
response = await communicator.get_response()
assert response["body"] == b"42"
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"text/plain")]
# Ensure the passed functions are "Awaitable Callables" and NOT coroutines.
async def awaitable_callable_app(scope, receive, send):
def receive_awaitable_callable():
return asyncio.ensure_future(receive())
def send_awaitable_callable(*args, **kwargs):
return asyncio.ensure_future(send(*args, **kwargs))
await app(scope, receive_awaitable_callable, send_awaitable_callable)
# Open a connection
communicator = HttpCommunicator(awaitable_callable_app, method="GET", path="/")
response = await communicator.get_response()
assert response["body"] == b"42"
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"text/plain")]
|