1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
"""
Regression tests for Issue #152: Watcher Task Leak.
Bug: ContextVar creates isolated state per async context, so each SSE
connection (running in fresh ASGI context) spawns its own _shutdown_watcher.
N connections = N watchers = CPU exhaustion over time.
Fix: Use threading.local() for per-thread state, so all connections in the
same thread share one watcher.
See: thoughts/issue152-watcher-leak-analysis.md
"""
import asyncio
import anyio
import pytest
# Watcher polls every 0.5s, so we need >0.5s for it to detect shutdown.
# Add margin for test system load variance.
WATCHER_POLL_INTERVAL = 0.5
WATCHER_DETECT_TIMEOUT = WATCHER_POLL_INTERVAL + 0.2 # 0.7s
class TestIssue152WatcherLeak:
"""Regression tests for Issue #152: only one watcher per thread."""
@pytest.mark.asyncio
async def test_single_watcher_per_thread(self):
"""
Issue #152 regression: Only one watcher should be started per thread.
In real ASGI apps:
- Each request runs in the SAME thread (threading.local is shared)
- Each request runs in a NEW async context (ContextVar is isolated)
With threading.local (fixed): state persists → 1 watcher per thread
With ContextVar (bug): state isolated per context → N watchers
"""
import sse_starlette.sse as sse_module
from sse_starlette.sse import _ensure_watcher_started_on_this_loop
watcher_starts = []
original_watcher = sse_module._shutdown_watcher
async def tracking_watcher():
"""Track when watcher is started, exit immediately."""
watcher_starts.append(True)
# Don't run the actual polling loop
sse_module._shutdown_watcher = tracking_watcher
try:
num_connections = 10
# Simulate 10 SSE connections in the same thread
# With threading.local: state persists, so only 1 watcher
# DON'T reset between calls - this simulates real ASGI behavior
for _ in range(num_connections):
_ensure_watcher_started_on_this_loop()
await asyncio.sleep(0.1)
assert len(watcher_starts) == 1, (
f"Issue #152: {len(watcher_starts)} watchers started for "
f"{num_connections} connections. Expected 1 shared watcher."
)
finally:
sse_module._shutdown_watcher = original_watcher
@pytest.mark.asyncio
async def test_watcher_broadcasts_to_all_events(self):
"""Verify that one watcher can signal multiple events.
This test also serves as coverage for test_issue132.py's broadcast test.
"""
import sse_starlette.sse as sse_module
from sse_starlette.sse import (
_ensure_watcher_started_on_this_loop,
_get_shutdown_state,
)
# Start watcher (state reset by conftest fixture)
_ensure_watcher_started_on_this_loop()
# Create multiple events (simulating multiple SSE connections)
state = _get_shutdown_state()
events = [anyio.Event() for _ in range(5)]
for event in events:
state.events.add(event)
try:
# Trigger shutdown
sse_module.AppStatus.should_exit = True
# Wait for watcher to broadcast
await asyncio.sleep(WATCHER_DETECT_TIMEOUT)
# All events should be set
for i, event in enumerate(events):
assert event.is_set(), f"Event {i} was not signaled by watcher"
finally:
for event in events:
state.events.discard(event)
@pytest.mark.asyncio
async def test_rapid_ensure_calls_spawn_single_watcher(self):
"""Multiple rapid calls to _ensure_watcher_started don't spawn multiple watchers."""
import sse_starlette.sse as sse_module
from sse_starlette.sse import _ensure_watcher_started_on_this_loop
watcher_starts = []
original_watcher = sse_module._shutdown_watcher
async def tracking_watcher():
watcher_starts.append(True)
sse_module._shutdown_watcher = tracking_watcher
try:
# Rapid-fire 100 calls
for _ in range(100):
_ensure_watcher_started_on_this_loop()
await asyncio.sleep(0.1)
assert (
len(watcher_starts) == 1
), f"Rapid calls spawned {len(watcher_starts)} watchers, expected 1"
finally:
sse_module._shutdown_watcher = original_watcher
@pytest.mark.asyncio
async def test_event_removal_during_broadcast_is_safe(self):
"""Removing an event from the set during broadcast doesn't crash.
The watcher iterates over list(state.events) to avoid mutation issues.
"""
import sse_starlette.sse as sse_module
from sse_starlette.sse import (
_ensure_watcher_started_on_this_loop,
_get_shutdown_state,
)
_ensure_watcher_started_on_this_loop()
state = _get_shutdown_state()
events = [anyio.Event() for _ in range(3)]
for event in events:
state.events.add(event)
# Remove one event before shutdown triggers broadcast
state.events.discard(events[1])
# Trigger shutdown - should not crash even though set was modified
sse_module.AppStatus.should_exit = True
await asyncio.sleep(WATCHER_DETECT_TIMEOUT)
# Remaining events should be signaled
assert events[0].is_set(), "Event 0 should be set"
assert events[2].is_set(), "Event 2 should be set"
@pytest.mark.asyncio
async def test_watcher_cleanup_allows_restart(self):
"""After watcher exits, a new connection can start a new watcher.
The watcher's finally block resets watcher_started=False.
"""
import sse_starlette.sse as sse_module
from sse_starlette.sse import (
_ensure_watcher_started_on_this_loop,
_get_shutdown_state,
)
watcher_starts = []
original_watcher = sse_module._shutdown_watcher
async def tracking_watcher():
watcher_starts.append(True)
state = _get_shutdown_state()
try:
while not sse_module.AppStatus.should_exit:
await asyncio.sleep(0.1)
finally:
state.watcher_started = False
sse_module._shutdown_watcher = tracking_watcher
try:
# First watcher
_ensure_watcher_started_on_this_loop()
await asyncio.sleep(0.05)
assert len(watcher_starts) == 1
# Trigger shutdown to exit first watcher
sse_module.AppStatus.should_exit = True
await asyncio.sleep(0.15) # Wait for watcher to exit
# Reset for second round
sse_module.AppStatus.should_exit = False
# Second watcher should be allowed to start
_ensure_watcher_started_on_this_loop()
await asyncio.sleep(0.05)
assert len(watcher_starts) == 2, (
f"Expected watcher to restart after cleanup (2 sequential spawns), "
f"got {len(watcher_starts)}"
)
finally:
sse_module._shutdown_watcher = original_watcher
|