File: hub.py

package info (click to toggle)
python-falcon 3.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 5,048 kB
  • sloc: python: 28,376; makefile: 182; sh: 135; javascript: 66
file content (78 lines) | stat: -rw-r--r-- 2,020 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
import uuid

from falcon.asgi import SSEvent


class Emitter:
    POLL_TIMEOUT = 3.0

    def __init__(self):
        self._done = False
        self._queue = asyncio.Queue()

    async def events(self):
        try:
            yield SSEvent(text='SSE CONNECTED')

            while True:
                try:
                    event = await asyncio.wait_for(
                        self._queue.get(), timeout=self.POLL_TIMEOUT
                    )
                    yield event
                except asyncio.TimeoutError:
                    # NOTE(vytas): Keep the connection alive.
                    yield None
        finally:
            # TODO(vytas): Is there a more elegant way to detect a disconnect?
            self._done = True

    async def enqueue(self, message):
        event = SSEvent(text=message, event_id=str(uuid.uuid4()))
        await self._queue.put(event)

    @property
    def done(self):
        return self._done


class Hub:
    def __init__(self):
        self._emitters = set()
        self._users = {}

    def _update_emitters(self):
        done = {emitter for emitter in self._emitters if emitter.done}
        self._emitters.difference_update(done)
        return self._emitters.copy()

    def add_user(self, name, ws):
        self._users[name] = ws

    def remove_user(self, name):
        self._users.pop(name, None)

    async def broadcast(self, message):
        for emitter in self._update_emitters():
            await emitter.enqueue(message)

    async def message(self, name, text):
        ws = self._users.get(name)
        if ws:
            # TODO(vytas): What if this overlaps with another ongoing send?
            await ws.send_text(text)

    def events(self):
        emitter = Emitter()
        self._update_emitters()
        self._emitters.add(emitter)
        return emitter.events()


class Events:
    def __init__(self, hub):
        self._hub = hub

    async def on_get(self, req, resp):
        resp.sse = self._hub.events()