File: custom_websocket.py

package info (click to toggle)
litestar 2.21.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,568 kB
  • sloc: python: 70,588; makefile: 254; javascript: 104; sh: 60
file content (19 lines) | stat: -rw-r--r-- 539 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from __future__ import annotations

from litestar import Litestar, WebSocket, websocket_listener
from litestar.types.asgi_types import WebSocketMode


class CustomWebSocket(WebSocket):
    async def receive_data(self, mode: WebSocketMode) -> str | bytes:
        """Return fixed response for every websocket message."""
        await super().receive_data(mode=mode)
        return "Fixed response"


@websocket_listener("/")
async def handler(data: str) -> str:
    return data


app = Litestar([handler], websocket_class=CustomWebSocket)