1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio
import aioredis
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
try:
async for msg in ws:
print(msg)
await asyncio.sleep(1)
finally:
request.app['websockets'].remove(ws)
return ws
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=999, message='Server shutdown')
async def listen_to_redis(app):
try:
sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
ch, *_ = await sub.subscribe('news')
async for msg in ch.iter(encoding='utf-8'):
# Forward message to all connected websockets:
for ws in app['websockets']:
await ws.send_str('{}: {}'.format(ch.name, msg))
print("message in {}: {}".format(ch.name, msg))
except asyncio.CancelledError:
pass
finally:
print('Cancel Redis listener: close connection...')
await sub.unsubscribe(ch.name)
await sub.quit()
print('Redis connection closed.')
async def start_background_tasks(app):
app['redis_listener'] = app.loop.create_task(listen_to_redis(app))
async def cleanup_background_tasks(app):
print('cleanup background tasks...')
app['redis_listener'].cancel()
await app['redis_listener']
def init():
app = web.Application()
app['websockets'] = []
app.router.add_get('/news', websocket_handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.on_shutdown.append(on_shutdown)
return app
web.run_app(init())
|