1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
import asyncio
from typing import List
from fastapi import Depends, FastAPI
from starlette import status
from sse_starlette import EventSourceResponse, ServerSentEvent
"""
This example shows how to use multiple streams.
"""
class Stream:
def __init__(self) -> None:
self._queue = None
@property
def queue(self):
if self._queue is None:
self._queue = asyncio.Queue[ServerSentEvent]()
return self._queue
def __aiter__(self) -> "Stream":
return self
async def __anext__(self) -> ServerSentEvent:
return await self.queue.get()
async def asend(self, value: ServerSentEvent) -> None:
await self.queue.put(value)
app = FastAPI()
_streams: List[Stream] = []
@app.get("/sse")
async def sse() -> EventSourceResponse:
stream = Stream()
_streams.append(stream)
return EventSourceResponse(stream)
@app.post("/message", status_code=status.HTTP_201_CREATED)
async def send_message(message: str, stream: Stream = Depends()) -> None:
for stream in _streams:
await stream.asend(
ServerSentEvent(data=message)
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000, log_level="trace")
|