File: stream_basic.py

package info (click to toggle)
litestar 2.19.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,500 kB
  • sloc: python: 70,169; makefile: 254; javascript: 105; sh: 60
file content (15 lines) | stat: -rw-r--r-- 284 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time
from typing import AsyncGenerator

from litestar import Litestar, websocket_stream


@websocket_stream("/")
async def ping() -> AsyncGenerator[float, None]:
    while True:
        yield time.time()
        await asyncio.sleep(0.5)


app = Litestar([ping])