File: stream_generator.py

package info (click to toggle)
python-sse-starlette 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,248 kB
  • sloc: python: 3,481; makefile: 131; sh: 57
file content (76 lines) | stat: -rw-r--r-- 2,322 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio
from typing import Optional

from fastapi import Depends, FastAPI
from starlette import status

from sse_starlette import EventSourceResponse, ServerSentEvent

"""
This example shows how to use a stream to push messages to a single client

Remark:
Lazy initialization of the queue for safe handling of initializing asyncio.Queue()
outside of an async context (it calls asyncio.get_event_loop() internally).
This is not an issue for python > 3.9 any more.

Example Client Usage:
# This command will stay connected and display all incoming messages
curl -N http://127.0.0.1:8000/sse

# In a separate terminal, send a message
curl -X POST "http://127.0.0.1:8000/message?message=Hello%20World" -H "accept: application/json"

# Send a message with quotes and spaces
curl -X POST "http://127.0.0.1:8000/message?message=This%20is%20a%20test%20message" -H "accept: application/json"

# Send a message with special characters
curl -X POST "http://127.0.0.1:8000/message?message=Special%20chars:%20%21%40%23%24%25%5E%26%2A%28%29" -H "accept: application/json"

# Send multiple messages in quick succession
for i in {1..5}; do
    curl -X POST "http://127.0.0.1:8000/message?message=Message%20number%20$i" -H "accept: application/json"
    sleep 0.5
done
"""

class Stream:
    def __init__(self) -> None:
        self._queue: Optional[asyncio.Queue[ServerSentEvent]] = None

    @property
    def queue(self) -> asyncio.Queue[ServerSentEvent]:
        if self._queue is None:
            self._queue = asyncio.Queue[ServerSentEvent]()
        return self._queue

    def __aiter__(self) -> "Stream":
        return self

    async def __anext__(self) -> ServerSentEvent:
        return await self.queue.get()

    async def asend(self, value: ServerSentEvent) -> None:
        await self.queue.put(value)


app = FastAPI()
_stream = Stream()


@app.get("/sse")
async def sse(stream: Stream = Depends(lambda: _stream)) -> EventSourceResponse:
    return EventSourceResponse(stream)


@app.post("/message", status_code=status.HTTP_201_CREATED)
async def send_message(message: str, stream: Stream = Depends(lambda: _stream)) -> None:
    await stream.asend(
        ServerSentEvent(data=message)
    )


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="trace")