1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
|
import asyncio
from functools import partial
import uuid
from engineio import json
import pickle
from .async_manager import AsyncManager
class AsyncPubSubManager(AsyncManager):
"""Manage a client list attached to a pub/sub backend under asyncio.
This is a base class that enables multiple servers to share the list of
clients, with the servers communicating events through a pub/sub backend.
The use of a pub/sub backend also allows any client connected to the
backend to emit events addressed to Socket.IO clients.
The actual backends must be implemented by subclasses, this class only
provides a pub/sub generic framework for asyncio applications.
:param channel: The channel name on which the server sends and receives
notifications.
"""
name = 'asyncpubsub'
def __init__(self, channel='socketio', write_only=False, logger=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger
def initialize(self):
super().initialize()
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self._get_logger().info(self.name + ' backend initialized.')
async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback=None, to=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients
connected to the namespace.
This method takes care or propagating the message to all the servers
that are connected through the message queue.
The parameters are the same as in :meth:`.Server.emit`.
Note: this method is a coroutine.
"""
room = to or room
if kwargs.get('ignore_queue'):
return await super().emit(
event, data, namespace=namespace, room=room, skip_sid=skip_sid,
callback=callback)
namespace = namespace or '/'
if callback is not None:
if self.server is None:
raise RuntimeError('Callbacks can only be issued from the '
'context of a server.')
if room is None:
raise ValueError('Cannot use callback without a room set.')
id = self._generate_ack_id(room, callback)
callback = (room, namespace, id)
else:
callback = None
message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id}
await self._handle_emit(message) # handle in this host
await self._publish(message) # notify other hosts
async def can_disconnect(self, sid, namespace):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return await super().can_disconnect(sid, namespace)
else:
# client is in another server, so we post request to the queue
await self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/',
'host_id': self.host_id})
async def disconnect(self, sid, namespace, **kwargs):
if kwargs.get('ignore_queue'):
return await super().disconnect(
sid, namespace=namespace)
message = {'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._handle_disconnect(message) # handle in this host
await self._publish(message) # notify other hosts
async def enter_room(self, sid, namespace, room, eio_sid=None):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return await super().enter_room(sid, namespace, room,
eio_sid=eio_sid)
else:
message = {'method': 'enter_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._publish(message) # notify other hosts
async def leave_room(self, sid, namespace, room):
if self.is_connected(sid, namespace):
# client is in this server, so we can disconnect directly
return await super().leave_room(sid, namespace, room)
else:
message = {'method': 'leave_room', 'sid': sid, 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._publish(message) # notify other hosts
async def close_room(self, room, namespace=None):
message = {'method': 'close_room', 'room': room,
'namespace': namespace or '/', 'host_id': self.host_id}
await self._handle_close_room(message) # handle in this host
await self._publish(message) # notify other hosts
async def _publish(self, data):
"""Publish a message on the Socket.IO channel.
This method needs to be implemented by the different subclasses that
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.') # pragma: no cover
async def _listen(self):
"""Return the next message published on the Socket.IO channel,
blocking until a message is available.
This method needs to be implemented by the different subclasses that
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.') # pragma: no cover
async def _handle_emit(self, message):
# Events with callbacks are very tricky to handle across hosts
# Here in the receiving end we set up a local callback that preserves
# the callback host and id from the sender
remote_callback = message.get('callback')
remote_host_id = message.get('host_id')
if remote_callback is not None and len(remote_callback) == 3:
callback = partial(self._return_callback, remote_host_id,
*remote_callback)
else:
callback = None
await super().emit(message['event'], message['data'],
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'),
callback=callback)
async def _handle_callback(self, message):
if self.host_id == message.get('host_id'):
try:
sid = message['sid']
id = message['id']
args = message['args']
except KeyError:
return
await self.trigger_callback(sid, id, args)
async def _return_callback(self, host_id, sid, namespace, callback_id,
*args):
# When an event callback is received, the callback is returned back
# the sender, which is identified by the host_id
if host_id == self.host_id:
await self.trigger_callback(sid, callback_id, args)
else:
await self._publish({'method': 'callback', 'host_id': host_id,
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})
async def _handle_disconnect(self, message):
await self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)
async def _handle_enter_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
await super().enter_room(sid, namespace, message.get('room'))
async def _handle_leave_room(self, message):
sid = message.get('sid')
namespace = message.get('namespace')
if self.is_connected(sid, namespace):
await super().leave_room(sid, namespace, message.get('room'))
async def _handle_close_room(self, message):
await super().close_room(room=message.get('room'),
namespace=message.get('namespace'))
async def _thread(self):
while True:
try:
async for message in self._listen(): # pragma: no branch
data = None
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format(
data['method']))
try:
if data['method'] == 'callback':
await self._handle_callback(data)
elif data.get('host_id') != self.host_id:
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'enter_room':
await self._handle_enter_room(data)
elif data['method'] == 'leave_room':
await self._handle_leave_room(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
except asyncio.CancelledError:
raise # let the outer try/except handle it
except Exception:
self.server.logger.exception(
'Handler error in pubsub listening thread')
self.server.logger.error('pubsub listen() exited unexpectedly')
break # loop should never exit except in unit tests!
except asyncio.CancelledError: # pragma: no cover
break
except Exception: # pragma: no cover
self.server.logger.exception('Unexpected Error in pubsub '
'listening thread')
|