File: main_endless.py

package info (click to toggle)
python-sse-starlette 2.3.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 944 kB
  • sloc: python: 1,487; makefile: 134; sh: 57
file content (51 lines) | stat: -rw-r--r-- 1,308 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# main.py
import asyncio
import logging

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from sse_starlette import EventSourceResponse

_log = logging.getLogger(__name__)


async def endless(req: Request):
    """Simulates an endless stream, events sent every 0.3 seconds"""

    async def event_publisher():
        i = 0
        try:
            while True:  # i <= 20:
                # yield dict(id=..., event=..., data=...)
                i += 1
                # print(f"Sending {i}")
                yield dict(data=i)
                await asyncio.sleep(0.3)
        except asyncio.CancelledError as e:
            _log.info(
                f"Disconnected from client (via refresh/close) {req.client} after {i} events"
            )
            # Do any other cleanup, if any
            raise e

    return EventSourceResponse(event_publisher())


async def healthcheck(req: Request):
    return JSONResponse({"status": "ok"})


app = Starlette(
    routes=[
        Route("/endless", endpoint=endless),
        Route("/health", endpoint=healthcheck),
    ],
)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8001, log_level="trace")