File: main.py

package info (click to toggle)
litestar 2.19.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,500 kB
  • sloc: python: 70,169; makefile: 254; javascript: 105; sh: 60
file content (44 lines) | stat: -rw-r--r-- 1,627 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import pdb  # noqa: T100
from concurrent.futures import ThreadPoolExecutor
from typing import Dict

# Install this packages if you want to run this test-app
import ipdb  # pyright: ignore  # noqa: T100
import pdbr  # pyright: ignore
import pudb  # pyright: ignore  # noqa: T100
import uvicorn

from litestar import Litestar, get


@get("/")
async def zero_division_error() -> Dict[str, str]:
    """Handler function that returns a greeting dictionary."""
    1 / 0  # pyright: ignore # noqa: B018
    return {"message": "ZeroDivisionError didn't occur."}


pdb_app = Litestar(route_handlers=[zero_division_error], pdb_on_exception=True, debugger_module=pdb)
ipdb_app = Litestar(route_handlers=[zero_division_error], pdb_on_exception=True, debugger_module=ipdb)
pudb_app = Litestar(route_handlers=[zero_division_error], pdb_on_exception=True, debugger_module=pudb)
pdbr_app = Litestar(route_handlers=[zero_division_error], pdb_on_exception=True, debugger_module=pdbr)


def run_server(app, port):
    uvicorn.run(app, port=port)


async def start_servers():
    with ThreadPoolExecutor() as executor:
        # Run all the servers concurrently
        await asyncio.gather(
            asyncio.get_event_loop().run_in_executor(executor, run_server, pdb_app, 8000),
            asyncio.get_event_loop().run_in_executor(executor, run_server, ipdb_app, 8001),
            asyncio.get_event_loop().run_in_executor(executor, run_server, pudb_app, 8002),
            asyncio.get_event_loop().run_in_executor(executor, run_server, pdbr_app, 8003),
        )


if __name__ == "__main__":
    asyncio.run(start_servers())