File: subprocess_sse_app.py

package info (click to toggle)
litestar 2.19.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,500 kB
  • sloc: python: 70,169; makefile: 254; javascript: 105; sh: 60
file content (24 lines) | stat: -rw-r--r-- 556 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"""
Assemble components into an app that shall be tested
"""

from typing import AsyncGenerator

from litestar import Litestar, get
from litestar.response import ServerSentEvent
from litestar.types import SSEData


async def generator(topic: str) -> AsyncGenerator[SSEData, None]:
    count = 0
    while count < 2:
        yield topic
        count += 1


@get("/notify/{topic:str}")
async def get_notified(topic: str) -> ServerSentEvent:
    return ServerSentEvent(generator(topic), event_type="Notifier")


app = Litestar(route_handlers=[get_notified])