1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
import asyncio
import async_timeout
import pytest
from channels.exceptions import ChannelFull
from channels.layers import InMemoryChannelLayer
@pytest.fixture()
async def channel_layer():
"""
Channel layer fixture that flushes automatically.
"""
channel_layer = InMemoryChannelLayer(capacity=3)
yield channel_layer
await channel_layer.flush()
await channel_layer.close()
@pytest.mark.asyncio
async def test_send_receive(channel_layer):
"""
Makes sure we can send a message to a normal channel then receive it.
"""
await channel_layer.send(
"test-channel-1", {"type": "test.message", "text": "Ahoy-hoy!"}
)
await channel_layer.send(
"test-channel-1", {"type": "test.message", "text": "Ahoy-hoy!"}
)
message = await channel_layer.receive("test-channel-1")
assert message["type"] == "test.message"
assert message["text"] == "Ahoy-hoy!"
# not removed because not empty
assert "test-channel-1" in channel_layer.channels
message = await channel_layer.receive("test-channel-1")
assert message["type"] == "test.message"
assert message["text"] == "Ahoy-hoy!"
# removed because empty
assert "test-channel-1" not in channel_layer.channels
@pytest.mark.asyncio
async def test_race_empty(channel_layer):
"""
Makes sure the race is handled gracefully.
"""
receive_task = asyncio.create_task(channel_layer.receive("test-channel-1"))
await asyncio.sleep(0.1)
await channel_layer.send(
"test-channel-1", {"type": "test.message", "text": "Ahoy-hoy!"}
)
del channel_layer.channels["test-channel-1"]
await asyncio.sleep(0.1)
message = await receive_task
assert message["type"] == "test.message"
assert message["text"] == "Ahoy-hoy!"
@pytest.mark.asyncio
async def test_send_capacity(channel_layer):
"""
Makes sure we get ChannelFull when we hit the send capacity
"""
await channel_layer.send("test-channel-1", {"type": "test.message"})
await channel_layer.send("test-channel-1", {"type": "test.message"})
await channel_layer.send("test-channel-1", {"type": "test.message"})
with pytest.raises(ChannelFull):
await channel_layer.send("test-channel-1", {"type": "test.message"})
@pytest.mark.asyncio
async def test_process_local_send_receive(channel_layer):
"""
Makes sure we can send a message to a process-local channel then receive it.
"""
channel_name = await channel_layer.new_channel()
await channel_layer.send(
channel_name, {"type": "test.message", "text": "Local only please"}
)
message = await channel_layer.receive(channel_name)
assert message["type"] == "test.message"
assert message["text"] == "Local only please"
@pytest.mark.asyncio
async def test_multi_send_receive(channel_layer):
"""
Tests overlapping sends and receives, and ordering.
"""
await channel_layer.send("test-channel-3", {"type": "message.1"})
await channel_layer.send("test-channel-3", {"type": "message.2"})
await channel_layer.send("test-channel-3", {"type": "message.3"})
assert (await channel_layer.receive("test-channel-3"))["type"] == "message.1"
assert (await channel_layer.receive("test-channel-3"))["type"] == "message.2"
assert (await channel_layer.receive("test-channel-3"))["type"] == "message.3"
@pytest.mark.asyncio
async def test_groups_basic(channel_layer):
"""
Tests basic group operation.
"""
await channel_layer.group_add("test-group", "test-gr-chan-1")
await channel_layer.group_add("test-group", "test-gr-chan-2")
await channel_layer.group_add("test-group", "test-gr-chan-3")
await channel_layer.group_discard("test-group", "test-gr-chan-2")
await channel_layer.group_send("test-group", {"type": "message.1"})
# Make sure we get the message on the two channels that were in
async with async_timeout.timeout(1):
assert (await channel_layer.receive("test-gr-chan-1"))["type"] == "message.1"
assert (await channel_layer.receive("test-gr-chan-3"))["type"] == "message.1"
# Make sure the removed channel did not get the message
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(1):
await channel_layer.receive("test-gr-chan-2")
@pytest.mark.asyncio
async def test_groups_channel_full(channel_layer):
"""
Tests that group_send ignores ChannelFull
"""
await channel_layer.group_add("test-group", "test-gr-chan-1")
await channel_layer.group_send("test-group", {"type": "message.1"})
await channel_layer.group_send("test-group", {"type": "message.1"})
await channel_layer.group_send("test-group", {"type": "message.1"})
await channel_layer.group_send("test-group", {"type": "message.1"})
await channel_layer.group_send("test-group", {"type": "message.1"})
@pytest.mark.asyncio
async def test_expiry_single():
"""
Tests that a message can expire.
"""
channel_layer = InMemoryChannelLayer(expiry=0.1)
await channel_layer.send("test-channel-1", {"type": "message.1"})
assert len(channel_layer.channels) == 1
await asyncio.sleep(0.1)
# Message should have expired and been dropped.
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(0.5):
await channel_layer.receive("test-channel-1")
# Channel should be cleaned up.
assert len(channel_layer.channels) == 0
@pytest.mark.asyncio
async def test_expiry_unread():
"""
Tests that a message on a channel can expire and be cleaned up even if
the channel is not read from again.
"""
channel_layer = InMemoryChannelLayer(expiry=0.1)
await channel_layer.send("test-channel-1", {"type": "message.1"})
await asyncio.sleep(0.1)
await channel_layer.send("test-channel-2", {"type": "message.2"})
assert len(channel_layer.channels) == 2
assert (await channel_layer.receive("test-channel-2"))["type"] == "message.2"
# Both channels should be cleaned up.
assert len(channel_layer.channels) == 0
@pytest.mark.asyncio
async def test_expiry_multi():
"""
Tests that multiple messages can expire.
"""
channel_layer = InMemoryChannelLayer(expiry=0.1)
await channel_layer.send("test-channel-1", {"type": "message.1"})
await channel_layer.send("test-channel-1", {"type": "message.2"})
await channel_layer.send("test-channel-1", {"type": "message.3"})
assert (await channel_layer.receive("test-channel-1"))["type"] == "message.1"
await asyncio.sleep(0.1)
await channel_layer.send("test-channel-1", {"type": "message.4"})
assert (await channel_layer.receive("test-channel-1"))["type"] == "message.4"
# The second and third message should have expired and been dropped.
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(0.5):
await channel_layer.receive("test-channel-1")
# Channel should be cleaned up.
assert len(channel_layer.channels) == 0
|