File: background_tasks.py

package info (click to toggle)
python-aiohttp 3.11.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 16,156 kB
  • sloc: python: 51,898; ansic: 20,843; makefile: 395; javascript: 31; sh: 3
file content (68 lines) | stat: -rwxr-xr-x 1,948 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
from contextlib import suppress
from typing import AsyncIterator, List

import valkey.asyncio as valkey

from aiohttp import web

valkey_listener = web.AppKey("valkey_listener", asyncio.Task[None])
websockets = web.AppKey("websockets", List[web.WebSocketResponse])


async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    request.app[websockets].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app[websockets].remove(ws)
    return ws


async def on_shutdown(app: web.Application) -> None:
    for ws in app[websockets]:
        await ws.close(code=999, message=b"Server shutdown")


async def listen_to_valkey(app: web.Application) -> None:
    r = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
    channel = "news"
    async with r.pubsub() as sub:
        await sub.subscribe(channel)
        async for msg in sub.listen():
            if msg["type"] != "message":
                continue
            # Forward message to all connected websockets:
            for ws in app[websockets]:
                await ws.send_str(f"{channel}: {msg}")
            print(f"message in {channel}: {msg}")


async def background_tasks(app: web.Application) -> AsyncIterator[None]:
    app[valkey_listener] = asyncio.create_task(listen_to_valkey(app))

    yield

    print("cleanup background tasks...")
    app[valkey_listener].cancel()
    with suppress(asyncio.CancelledError):
        await app[valkey_listener]


def init():
    app = web.Application()
    l: List[web.WebSocketResponse] = []
    app[websockets] = l
    app.router.add_get("/news", websocket_handler)
    app.cleanup_ctx.append(background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app


web.run_app(init())