1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
import asyncio
import logging
import httpx
import pytest
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
_log = logging.getLogger(__name__)
class SSEServerContainer(DockerContainer):
def __init__(self, app_path: str):
super().__init__("sse_starlette:latest")
self.app_path = app_path
# Mount the current directory into the container
self.with_volume_mapping(
host="/Users/Q187392/dev/s/public/sse-starlette", container="/app"
)
self.with_name("sse_starlette_test")
self.with_command(
f"uvicorn {self.app_path} --host 0.0.0.0 --port 8000 --log-level info"
)
# Expose the port
self.with_exposed_ports(8000)
async def consume_events(url: str, expected_lines: int = 2):
"""Simulate Client: Stream the SSE endpoint and count received lines."""
i = 0
async with httpx.AsyncClient() as client:
try:
async with client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line.strip():
_log.info(f"Received line: {line}")
i += 1
except (httpx.RemoteProtocolError, httpx.ReadError) as e:
_log.error(f"Error during streaming: {str(e)}")
return i, str(e)
return i, None
@pytest.mark.integration
@pytest.mark.parametrize(
("app_path", "expected_lines"),
[
("tests.integration.main_endless:app", 14),
("tests.integration.main_endless_conditional:app", 2),
],
)
async def test_sse_server_termination(caplog, app_path, expected_lines):
caplog.set_level(logging.DEBUG)
N_CONSUMERS = 3
# Start server in container
container = SSEServerContainer(app_path)
container.start()
try:
# Wait for server to be ready
wait_for_logs(container, "Application startup complete", timeout=10)
port = container.get_exposed_port(8000)
url = f"http://localhost:{port}/endless"
# Create background tasks for consumers
tasks = [
asyncio.create_task(consume_events(url, expected_lines))
for _ in range(N_CONSUMERS)
]
# Wait a bit then kill the server
await asyncio.sleep(1)
container.stop(force=True)
# Now wait for all tasks to complete
results = await asyncio.gather(*tasks)
# Check error count: one connection error per client
error_count = sum(1 for _, error in results if error is not None)
assert (
error_count == N_CONSUMERS
), f"Expected {N_CONSUMERS} errors, got {error_count}"
# Verify error messages
for _, error in results:
assert (
error
and "peer closed connection without sending complete message body (incomplete chunked read)"
in error.lower()
), "Expected peer closed connection error"
# Check message counts
message_counts = [count for count, _ in results]
_log.info(f"Message counts received: {message_counts}")
# Since we're killing the server early, we expect incomplete message counts
assert all(
count < expected_lines for count in message_counts
), f"Expected all counts to be less than {expected_lines}, got {message_counts}"
finally:
# Cleanup container if it's still around
try:
container.stop(force=True)
except Exception as e:
_log.debug(f"Error during cleanup: {e}")
|