File: 01_basic_sse.py

package info (click to toggle)
python-sse-starlette 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,248 kB
  • sloc: python: 3,481; makefile: 131; sh: 57
file content (99 lines) | stat: -rw-r--r-- 3,112 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
Basic Server-Sent Events (SSE) example with both Starlette and FastAPI.

This example demonstrates:
- Simple number streaming
- Both Starlette and FastAPI implementations
- Proper client disconnection handling

Usage:
    python 01_basic_sse.py

Test with curl:
    # Basic streaming (will receive numbers 1-10 with 1 second intervals)
    curl -N http://localhost:8000/starlette/numbers

    # Endless streaming (press Ctrl+C to stop)
    curl -N http://localhost:8000/fastapi/endless

    # Custom range
    curl -N http://localhost:8000/fastapi/range/5/15
"""

import asyncio
import logging
from typing import AsyncGenerator

import uvicorn
from fastapi import FastAPI
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route, Mount

from sse_starlette import EventSourceResponse

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def generate_numbers(start: int, end: int, delay: float = 1.0) -> AsyncGenerator[dict, None]:
    """Generate numbered events with configurable range and delay."""
    for number in range(start, end + 1):
        await asyncio.sleep(delay)
        yield {"data": f"Number: {number}"}


async def generate_endless_stream(request: Request) -> AsyncGenerator[dict, None]:
    """Generate endless numbered events with proper cleanup on client disconnect."""
    counter = 0
    try:
        while True:
            counter += 1
            yield {"data": f"Event #{counter}", "id": str(counter)}
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        logger.info(f"Client disconnected after receiving {counter} events")
        raise


# Starlette implementation
async def starlette_numbers_endpoint(request: Request) -> EventSourceResponse:
    """Starlette endpoint that streams numbers 1-10."""
    return EventSourceResponse(generate_numbers(1, 10))


# FastAPI implementation
fastapi_app = FastAPI(title="SSE FastAPI Example")


@fastapi_app.get("/endless")
async def fastapi_endless_endpoint(request: Request) -> EventSourceResponse:
    """FastAPI endpoint that streams endless events."""
    return EventSourceResponse(generate_endless_stream(request))


@fastapi_app.get("/range/{start}/{end}")
async def fastapi_range_endpoint(
    request: Request, start: int, end: int
) -> EventSourceResponse:
    """FastAPI endpoint that streams a custom range of numbers."""
    return EventSourceResponse(generate_numbers(start, end))


# Main Starlette application
starlette_routes = [
    Route("/starlette/numbers", endpoint=starlette_numbers_endpoint),
    Mount("/fastapi", app=fastapi_app),
]

app = Starlette(debug=True, routes=starlette_routes)

if __name__ == "__main__":
    print("Starting SSE server...")
    print("Available endpoints:")
    print("  - http://localhost:8000/starlette/numbers (numbers 1-10)")
    print("  - http://localhost:8000/fastapi/endless (endless stream)")
    print("  - http://localhost:8000/fastapi/range/5/15 (custom range)")

    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")