File: graceful_shutdown.py

package info (click to toggle)
aiohttp-sse 2.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 220 kB
  • sloc: python: 919; makefile: 35; sh: 5
file content (122 lines) | stat: -rw-r--r-- 3,385 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import json
import logging
import weakref
from contextlib import suppress
from datetime import datetime
from functools import partial
from typing import Any, Callable, Dict, Optional

from aiohttp import web

from aiohttp_sse import EventSourceResponse, sse_response

streams_key = web.AppKey("streams_key", weakref.WeakSet["SSEResponse"])
worker_key = web.AppKey("worker_key", asyncio.Task[None])


class SSEResponse(EventSourceResponse):
    async def send_json(
        self,
        data: Dict[str, Any],
        id: Optional[str] = None,
        event: Optional[str] = None,
        retry: Optional[int] = None,
        json_dumps: Callable[[Any], str] = partial(json.dumps, indent=2),
    ) -> None:
        await self.send(json_dumps(data), id=id, event=event, retry=retry)


async def send_event(
    stream: SSEResponse,
    data: Dict[str, Any],
    event_id: str,
) -> None:
    try:
        await stream.send_json(data, id=event_id)
    except Exception:
        logging.exception("Exception when sending event: %s", event_id)


async def worker(app: web.Application) -> None:
    while True:
        now = datetime.now()
        delay = asyncio.create_task(asyncio.sleep(1))  # Fire

        fs = []
        for stream in app[streams_key]:
            data = {
                "time": f"Server Time : {now}",
                "last_event_id": stream.last_event_id,
            }
            coro = send_event(stream, data, str(now.timestamp()))
            fs.append(coro)

        # Run in parallel
        await asyncio.gather(*fs)

        # Sleep 1s - n
        await delay


async def on_startup(app: web.Application) -> None:
    app[streams_key] = weakref.WeakSet[SSEResponse]()
    app[worker_key] = asyncio.create_task(worker(app))


async def clean_up(app: web.Application) -> None:
    app[worker_key].cancel()
    with suppress(asyncio.CancelledError):
        await app[worker_key]


async def on_shutdown(app: web.Application) -> None:
    waiters = []
    for stream in app[streams_key]:
        stream.stop_streaming()
        waiters.append(stream.wait())

    await asyncio.gather(*waiters, return_exceptions=True)
    app[streams_key].clear()


async def hello(request: web.Request) -> web.StreamResponse:
    stream: SSEResponse = await sse_response(request, response_cls=SSEResponse)
    request.app[streams_key].add(stream)
    try:
        await stream.wait()
    finally:
        request.app[streams_key].discard(stream)
    return stream


async def index(_request: web.Request) -> web.StreamResponse:
    d = """
    <html>
        <head>
            <script>
                var eventSource = new EventSource("/hello");
                eventSource.addEventListener("message", event => {
                    document.getElementById("response").innerText = event.data;
                });
            </script>
        </head>
        <body>
            <h1>Response from server:</h1>
            <pre id="response"></pre>
        </body>
    </html>
    """
    return web.Response(text=d, content_type="text/html")


if __name__ == "__main__":
    app = web.Application()

    app.on_startup.append(on_startup)
    app.on_shutdown.append(on_shutdown)
    app.on_cleanup.append(clean_up)

    app.router.add_route("GET", "/hello", hello)
    app.router.add_route("GET", "/", index)
    web.run_app(app, host="127.0.0.1", port=8080)