File: background_tasks.py

package info (click to toggle)
firefox-esr 140.4.0esr-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,539,276 kB
  • sloc: cpp: 7,381,286; javascript: 6,388,710; ansic: 3,710,139; python: 1,393,780; xml: 628,165; asm: 426,918; java: 184,004; sh: 65,742; makefile: 19,302; objc: 13,059; perl: 12,912; yacc: 4,583; cs: 3,846; pascal: 3,352; lex: 1,720; ruby: 1,226; exp: 762; php: 436; lisp: 258; awk: 247; sql: 66; sed: 54; csh: 10
file content (71 lines) | stat: -rwxr-xr-x 2,011 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
from typing import List

import aioredis

from aiohttp import web

redis_listener = web.AppKey("redis_listener", asyncio.Task[None])
websockets = web.AppKey("websockets", List[web.WebSocketResponse])


async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    request.app[websockets].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app[websockets].remove(ws)
    return ws


async def on_shutdown(app: web.Application) -> None:
    for ws in app[websockets]:
        await ws.close(code=999, message=b"Server shutdown")


async def listen_to_redis(app):
    try:
        sub = await aioredis.Redis(host="localhost", port=6379)
        ch, *_ = await sub.subscribe("news")
        async for msg in ch.iter(encoding="utf-8"):
            # Forward message to all connected websockets:
            for ws in app[websockets]:
                await ws.send_str(f"{ch.name}: {msg}")
            print(f"message in {ch.name}: {msg}")
    except asyncio.CancelledError:
        pass
    finally:
        print("Cancel Redis listener: close connection...")
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print("Redis connection closed.")


async def start_background_tasks(app: web.Application) -> None:
    app[redis_listener] = asyncio.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print("cleanup background tasks...")
    app[redis_listener].cancel()
    await app[redis_listener]


def init():
    app = web.Application()
    l: List[web.WebSocketResponse] = []
    app[websockets] = l
    app.router.add_get("/news", websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app


web.run_app(init())