File: async_pubsub_manager.py

package info (click to toggle)
python-socketio 5.12.1-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,140 kB
  • sloc: python: 11,460; makefile: 13; sh: 7
file content (243 lines) | stat: -rw-r--r-- 11,141 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import asyncio
from functools import partial
import uuid

from engineio import json
import pickle

from .async_manager import AsyncManager


class AsyncPubSubManager(AsyncManager):
    """Manage a client list attached to a pub/sub backend under asyncio.

    This is a base class that enables multiple servers to share the list of
    clients, with the servers communicating events through a pub/sub backend.
    The use of a pub/sub backend also allows any client connected to the
    backend to emit events addressed to Socket.IO clients.

    The actual backends must be implemented by subclasses, this class only
    provides a pub/sub generic framework for asyncio applications.

    :param channel: The channel name on which the server sends and receives
                    notifications.
    """
    name = 'asyncpubsub'

    def __init__(self, channel='socketio', write_only=False, logger=None):
        super().__init__()
        self.channel = channel
        self.write_only = write_only
        self.host_id = uuid.uuid4().hex
        self.logger = logger

    def initialize(self):
        super().initialize()
        if not self.write_only:
            self.thread = self.server.start_background_task(self._thread)
        self._get_logger().info(self.name + ' backend initialized.')

    async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
                   callback=None, to=None, **kwargs):
        """Emit a message to a single client, a room, or all the clients
        connected to the namespace.

        This method takes care or propagating the message to all the servers
        that are connected through the message queue.

        The parameters are the same as in :meth:`.Server.emit`.

        Note: this method is a coroutine.
        """
        room = to or room
        if kwargs.get('ignore_queue'):
            return await super().emit(
                event, data, namespace=namespace, room=room, skip_sid=skip_sid,
                callback=callback)
        namespace = namespace or '/'
        if callback is not None:
            if self.server is None:
                raise RuntimeError('Callbacks can only be issued from the '
                                   'context of a server.')
            if room is None:
                raise ValueError('Cannot use callback without a room set.')
            id = self._generate_ack_id(room, callback)
            callback = (room, namespace, id)
        else:
            callback = None
        message = {'method': 'emit', 'event': event, 'data': data,
                   'namespace': namespace, 'room': room,
                   'skip_sid': skip_sid, 'callback': callback,
                   'host_id': self.host_id}
        await self._handle_emit(message)  # handle in this host
        await self._publish(message)  # notify other hosts

    async def can_disconnect(self, sid, namespace):
        if self.is_connected(sid, namespace):
            # client is in this server, so we can disconnect directly
            return await super().can_disconnect(sid, namespace)
        else:
            # client is in another server, so we post request to the queue
            await self._publish({'method': 'disconnect', 'sid': sid,
                                 'namespace': namespace or '/',
                                 'host_id': self.host_id})

    async def disconnect(self, sid, namespace, **kwargs):
        if kwargs.get('ignore_queue'):
            return await super().disconnect(
                sid, namespace=namespace)
        message = {'method': 'disconnect', 'sid': sid,
                   'namespace': namespace or '/', 'host_id': self.host_id}
        await self._handle_disconnect(message)  # handle in this host
        await self._publish(message)  # notify other hosts

    async def enter_room(self, sid, namespace, room, eio_sid=None):
        if self.is_connected(sid, namespace):
            # client is in this server, so we can disconnect directly
            return await super().enter_room(sid, namespace, room,
                                            eio_sid=eio_sid)
        else:
            message = {'method': 'enter_room', 'sid': sid, 'room': room,
                       'namespace': namespace or '/', 'host_id': self.host_id}
            await self._publish(message)  # notify other hosts

    async def leave_room(self, sid, namespace, room):
        if self.is_connected(sid, namespace):
            # client is in this server, so we can disconnect directly
            return await super().leave_room(sid, namespace, room)
        else:
            message = {'method': 'leave_room', 'sid': sid, 'room': room,
                       'namespace': namespace or '/', 'host_id': self.host_id}
            await self._publish(message)  # notify other hosts

    async def close_room(self, room, namespace=None):
        message = {'method': 'close_room', 'room': room,
                   'namespace': namespace or '/', 'host_id': self.host_id}
        await self._handle_close_room(message)  # handle in this host
        await self._publish(message)  # notify other hosts

    async def _publish(self, data):
        """Publish a message on the Socket.IO channel.

        This method needs to be implemented by the different subclasses that
        support pub/sub backends.
        """
        raise NotImplementedError('This method must be implemented in a '
                                  'subclass.')  # pragma: no cover

    async def _listen(self):
        """Return the next message published on the Socket.IO channel,
        blocking until a message is available.

        This method needs to be implemented by the different subclasses that
        support pub/sub backends.
        """
        raise NotImplementedError('This method must be implemented in a '
                                  'subclass.')  # pragma: no cover

    async def _handle_emit(self, message):
        # Events with callbacks are very tricky to handle across hosts
        # Here in the receiving end we set up a local callback that preserves
        # the callback host and id from the sender
        remote_callback = message.get('callback')
        remote_host_id = message.get('host_id')
        if remote_callback is not None and len(remote_callback) == 3:
            callback = partial(self._return_callback, remote_host_id,
                               *remote_callback)
        else:
            callback = None
        await super().emit(message['event'], message['data'],
                           namespace=message.get('namespace'),
                           room=message.get('room'),
                           skip_sid=message.get('skip_sid'),
                           callback=callback)

    async def _handle_callback(self, message):
        if self.host_id == message.get('host_id'):
            try:
                sid = message['sid']
                id = message['id']
                args = message['args']
            except KeyError:
                return
            await self.trigger_callback(sid, id, args)

    async def _return_callback(self, host_id, sid, namespace, callback_id,
                               *args):
        # When an event callback is received, the callback is returned back
        # the sender, which is identified by the host_id
        if host_id == self.host_id:
            await self.trigger_callback(sid, callback_id, args)
        else:
            await self._publish({'method': 'callback', 'host_id': host_id,
                                 'sid': sid, 'namespace': namespace,
                                 'id': callback_id, 'args': args})

    async def _handle_disconnect(self, message):
        await self.server.disconnect(sid=message.get('sid'),
                                     namespace=message.get('namespace'),
                                     ignore_queue=True)

    async def _handle_enter_room(self, message):
        sid = message.get('sid')
        namespace = message.get('namespace')
        if self.is_connected(sid, namespace):
            await super().enter_room(sid, namespace, message.get('room'))

    async def _handle_leave_room(self, message):
        sid = message.get('sid')
        namespace = message.get('namespace')
        if self.is_connected(sid, namespace):
            await super().leave_room(sid, namespace, message.get('room'))

    async def _handle_close_room(self, message):
        await super().close_room(room=message.get('room'),
                                 namespace=message.get('namespace'))

    async def _thread(self):
        while True:
            try:
                async for message in self._listen():  # pragma: no branch
                    data = None
                    if isinstance(message, dict):
                        data = message
                    else:
                        if isinstance(message, bytes):  # pragma: no cover
                            try:
                                data = pickle.loads(message)
                            except:
                                pass
                        if data is None:
                            try:
                                data = json.loads(message)
                            except:
                                pass
                    if data and 'method' in data:
                        self._get_logger().debug('pubsub message: {}'.format(
                            data['method']))
                        try:
                            if data['method'] == 'callback':
                                await self._handle_callback(data)
                            elif data.get('host_id') != self.host_id:
                                if data['method'] == 'emit':
                                    await self._handle_emit(data)
                                elif data['method'] == 'disconnect':
                                    await self._handle_disconnect(data)
                                elif data['method'] == 'enter_room':
                                    await self._handle_enter_room(data)
                                elif data['method'] == 'leave_room':
                                    await self._handle_leave_room(data)
                                elif data['method'] == 'close_room':
                                    await self._handle_close_room(data)
                        except asyncio.CancelledError:
                            raise  # let the outer try/except handle it
                        except Exception:
                            self.server.logger.exception(
                                'Handler error in pubsub listening thread')
                self.server.logger.error('pubsub listen() exited unexpectedly')
                break  # loop should never exit except in unit tests!
            except asyncio.CancelledError:  # pragma: no cover
                break
            except Exception:  # pragma: no cover
                self.server.logger.exception('Unexpected Error in pubsub '
                                             'listening thread')