File: health_check_server.py

package info (click to toggle)
python-websockets 15.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,948 kB
  • sloc: python: 25,105; javascript: 350; ansic: 148; makefile: 43
file content (19 lines) | stat: -rwxr-xr-x 508 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python

import asyncio
from http import HTTPStatus
from websockets.asyncio.server import serve

def health_check(connection, request):
    if request.path == "/healthz":
        return connection.respond(HTTPStatus.OK, "OK\n")

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def main():
    async with serve(echo, "localhost", 8765, process_request=health_check) as server:
        await server.serve_forever()

asyncio.run(main())