File: main_endless_conditional.py

package info (click to toggle)
python-sse-starlette 2.3.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 944 kB
  • sloc: python: 1,487; makefile: 134; sh: 57
file content (68 lines) | stat: -rw-r--r-- 2,209 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# main.py
import asyncio
import logging

import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from sse_starlette import EventSourceResponse

"""
example by: justindujardin
4efaffc2365a85f132ab8fc405110120c9c9e36a, https://github.com/sysid/sse-starlette/pull/13

tests proper shutdown in case no messages are yielded:
- in a streaming endpoint that reports only on "new" data, it is possible to get into a state
    where no no yields are expected to happen in the near future.
    e.g. there are no new chat messages to emit.
- add a third task to taskgroup that checks the uvicorn exit status at a regular interval.
"""

_log = logging.getLogger(__name__)


async def endless(req: Request):
    """Simulates an endless stream but only yields one item

    In case of server shutdown the running task has to be stopped via signal handler in order
    to enable proper server shutdown. Otherwise, there will be dangling tasks preventing proper shutdown.
    (deadlock)
    """

    async def event_publisher():
        has_data = True  # The event publisher only conditionally emits items
        try:
            while True:
                disconnected = await req.is_disconnected()
                if disconnected:
                    _log.info(f"Disconnecting client {req.client}")
                    break
                # Simulate only sending one response
                if has_data:
                    yield dict(data="u can haz the data")
                    has_data = False
                await asyncio.sleep(0.9)
        except asyncio.CancelledError as e:
            _log.info(f"Disconnected from client (via refresh/close) {req.client}")
            # Do any other cleanup, if any
            raise e

    return EventSourceResponse(event_publisher())


async def healthcheck(req: Request):
    return JSONResponse({"status": "ok"})


app = Starlette(
    routes=[
        Route("/endless", endpoint=endless),
        Route("/health", endpoint=healthcheck),
    ],
)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="trace")