1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
#!/usr/bin/env python
import asyncio
import json
import aioredis
import django
django.setup()
from django.contrib.contenttypes.models import ContentType
from sesame.utils import get_user
from websockets.asyncio.server import broadcast, serve
from websockets.frames import CloseCode
CONNECTIONS = {}
def get_content_types(user):
"""Return the set of IDs of content types visible by user."""
# This does only three database queries because Django caches
# all permissions on the first call to user.has_perm(...).
return {
ct.id
for ct in ContentType.objects.all()
if user.has_perm(f"{ct.app_label}.view_{ct.model}")
or user.has_perm(f"{ct.app_label}.change_{ct.model}")
}
async def handler(websocket):
"""Authenticate user and register connection in CONNECTIONS."""
sesame = await websocket.recv()
user = await asyncio.to_thread(get_user, sesame)
if user is None:
await websocket.close(CloseCode.INTERNAL_ERROR, "authentication failed")
return
ct_ids = await asyncio.to_thread(get_content_types, user)
CONNECTIONS[websocket] = {"content_type_ids": ct_ids}
try:
await websocket.wait_closed()
finally:
del CONNECTIONS[websocket]
async def process_events():
"""Listen to events in Redis and process them."""
redis = aioredis.from_url("redis://127.0.0.1:6379/1")
pubsub = redis.pubsub()
await pubsub.subscribe("events")
async for message in pubsub.listen():
if message["type"] != "message":
continue
payload = message["data"].decode()
# Broadcast event to all users who have permissions to see it.
event = json.loads(payload)
recipients = (
websocket
for websocket, connection in CONNECTIONS.items()
if event["content_type_id"] in connection["content_type_ids"]
)
broadcast(recipients, payload)
async def main():
async with serve(handler, "localhost", 8888):
await process_events() # runs forever
if __name__ == "__main__":
asyncio.run(main())
|