1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
|
# demonstrations/advanced_patterns/memory_channels.py
"""
DEMONSTRATION: Memory Channels Alternative to Generators
PURPOSE:
Shows how to use anyio memory channels instead of async generators
for SSE streaming, providing better control over data flow.
KEY LEARNING:
- Memory channels decouple data production from consumption
- Better error handling and resource management
- More flexible than generators for complex scenarios
PATTERN:
Producer-consumer pattern using memory channels for SSE.
"""
import asyncio
from functools import partial
import anyio
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def data_producer(send_channel: anyio.abc.ObjectSendStream, producer_id: str):
"""
Producer that generates data and sends it through a memory channel.
This runs independently of the SSE connection.
"""
async with send_channel: # Ensures channel closes when done
try:
print(f"๐ญ Producer {producer_id} started")
for i in range(1, 10):
# Simulate data processing
await asyncio.sleep(1)
# Create event data
event_data = {
"data": f"Data from producer {producer_id}, item {i}",
"id": f"{producer_id}-{i}",
"event": "production_data"
}
# Send through channel (non-blocking)
await send_channel.send(event_data)
print(f"๐ค Producer {producer_id} sent item {i}")
# Send completion signal
await send_channel.send({
"data": f"Producer {producer_id} completed",
"event": "producer_complete"
})
except Exception as e:
print(f"๐ฅ Producer {producer_id} error: {e}")
# Send error through channel
await send_channel.send({
"data": f"Producer error: {e}",
"event": "error"
})
finally:
print(f"๐งน Producer {producer_id} cleanup completed")
async def memory_channel_endpoint(request: Request):
"""
SSE endpoint using memory channels instead of generators.
"""
# Create memory channel for producer-consumer communication
send_channel, receive_channel = anyio.create_memory_object_stream(
max_buffer_size=10 # Bounded buffer prevents memory issues
)
# Create unique producer ID for this connection
producer_id = f"prod-{id(request)}"
# Create EventSourceResponse with channel and producer
return EventSourceResponse(
receive_channel, # Consumer side of the channel
data_sender_callable=partial(data_producer, send_channel, producer_id),
ping=5
)
async def multi_producer_endpoint(request: Request):
"""
Advanced example: Multiple producers feeding one SSE stream.
Demonstrates how memory channels enable complex data flows.
"""
# Create channel for combined output
combined_send, combined_receive = anyio.create_memory_object_stream(max_buffer_size=20)
async def multi_producer_coordinator(combined_channel):
"""
Coordinates multiple producers and merges their output.
"""
async with combined_channel:
try:
# Create multiple producer channels
producer_channels = []
for i in range(3): # 3 producers
send_ch, recv_ch = anyio.create_memory_object_stream(max_buffer_size=5)
producer_channels.append((send_ch, recv_ch, f"multi-{i}"))
async with anyio.create_task_group() as tg:
# Start all producers
for send_ch, _, prod_id in producer_channels:
tg.start_soon(data_producer, send_ch, prod_id)
# Merge all producer outputs
async def merge_outputs():
# Collect all receive channels
receive_channels = [recv_ch for _, recv_ch, _ in producer_channels]
# Use anyio to multiplex channels
async with anyio.create_task_group() as merge_tg:
for recv_ch in receive_channels:
merge_tg.start_soon(forward_channel_data, recv_ch, combined_channel)
tg.start_soon(merge_outputs)
except Exception as e:
await combined_channel.send({
"data": f"Multi-producer error: {e}",
"event": "error"
})
return EventSourceResponse(
combined_receive,
data_sender_callable=partial(multi_producer_coordinator, combined_send),
ping=3
)
async def forward_channel_data(source_channel, target_channel):
"""
Helper function to forward data from one channel to another.
"""
async with source_channel:
async for item in source_channel:
try:
await target_channel.send(item)
except anyio.BrokenResourceError:
# Target channel closed
break
async def backpressure_demo_endpoint(request: Request):
"""
Demonstrates backpressure handling with memory channels.
Shows what happens when producer is faster than consumer.
"""
# Small buffer to demonstrate backpressure
send_channel, receive_channel = anyio.create_memory_object_stream(max_buffer_size=2)
async def fast_producer(channel):
"""Producer that generates data faster than typical consumption."""
async with channel:
try:
for i in range(20):
event_data = {
"data": f"Fast data {i} - buffer may be full!",
"id": str(i),
"event": "fast_data"
}
print(f"๐ Trying to send item {i}")
# This will block when buffer is full (backpressure)
await channel.send(event_data)
print(f"โ
Sent item {i}")
# Producer works faster than typical consumer
await asyncio.sleep(0.01)
except Exception as e:
print(f"๐ฅ Fast producer error: {e}")
await channel.send({
"data": f"Producer error: {e}",
"event": "error"
})
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(fast_producer, send_channel),
ping=2
)
# Test application
app = Starlette(routes=[
Route("/memory-channel", memory_channel_endpoint),
Route("/multi-producer", multi_producer_endpoint),
Route("/backpressure", backpressure_demo_endpoint)
])
if __name__ == "__main__":
"""
DEMONSTRATION STEPS:
1. Run server: python memory_channels.py
2. Test single producer: curl -N http://localhost:8000/memory-channel
3. Test multi-producer: curl -N http://localhost:8000/multi-producer
4. Test backpressure: curl -N http://localhost:8000/sse | pv -q -L 10
MEMORY CHANNEL BENEFITS:
- Better separation of concerns (producer vs consumer)
- Built-in backpressure handling
- Multiple producers can feed one stream
- More robust error handling
- Easier testing and debugging
WHEN TO USE:
- Complex data processing pipelines
- Multiple data sources
- Need for buffering and flow control
- Better error isolation required
"""
import uvicorn
print("๐ Starting memory channels demonstration server...")
print("๐ Available endpoints:")
print(" /memory-channel - Basic producer-consumer pattern")
print(" /multi-producer - Multiple producers, one stream")
print(" /backpressure - Backpressure demonstration")
uvicorn.run(app, host="localhost", port=8000, log_level="info")
|