1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
#!./uwsgi --http-socket :9090 --asyncio 100 --module tests.websockets_chat_asyncio --greenlet
import uwsgi
import asyncio
import asyncio_redis
import time
import greenlet
class GreenFuture(asyncio.Future):
def __init__(self):
super().__init__()
self.greenlet = greenlet.getcurrent()
self.add_done_callback(lambda f: f.greenlet.switch())
def result(self):
while True:
if self.done():
return super().result()
self.greenlet.parent.switch()
@asyncio.coroutine
def redis_open(f):
connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379)
f.set_result(connection)
f.greenlet.switch()
@asyncio.coroutine
def redis_subscribe(f):
connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['foobar'])
f.set_result(subscriber)
f.greenlet.switch()
def ws_recv_msg(g):
g.has_ws_msg = True
g.switch()
@asyncio.coroutine
def redis_wait(subscriber, f):
reply = yield from subscriber.next_published()
f.set_result(reply.value)
f.greenlet.switch()
@asyncio.coroutine
def redis_publish(connection, msg):
yield from connection.publish('foobar', msg.decode('utf-8'))
def application(env, sr):
ws_scheme = 'ws'
if 'HTTPS' in env or env['wsgi.url_scheme'] == 'https':
ws_scheme = 'wss'
if env['PATH_INFO'] == '/':
sr('200 OK', [('Content-Type', 'text/html')])
return ("""
<html>
<head>
<script language="Javascript">
var s = new WebSocket("%s://%s/foobar/");
s.onopen = function() {
alert("connected !!!");
s.send("ciao");
};
s.onmessage = function(e) {
var bb = document.getElementById('blackboard')
var html = bb.innerHTML;
bb.innerHTML = html + '<br/>' + e.data;
};
s.onerror = function(e) {
alert(e);
}
s.onclose = function(e) {
alert("connection closed");
}
function invia() {
var value = document.getElementById('testo').value;
s.send(value);
}
</script>
</head>
<body>
<h1>WebSocket</h1>
<input type="text" id="testo"/>
<input type="button" value="invia" onClick="invia();"/>
<div id="blackboard" style="width:640px;height:480px;background-color:black;color:white;border: solid 2px red;overflow:auto">
</div>
</body>
</html>
""" % (ws_scheme, env['HTTP_HOST'])).encode()
elif env['PATH_INFO'] == '/favicon.ico':
return b""
elif env['PATH_INFO'] == '/foobar/':
uwsgi.websocket_handshake()
print("websockets...")
# a future for waiting for redis connection
f = GreenFuture()
asyncio.Task(redis_subscribe(f))
# the result() method will switch greenlets if needed
subscriber = f.result()
# open another redis connection for publishing messages
f0 = GreenFuture()
t = asyncio.Task(redis_open(f0))
connection = f0.result()
myself = greenlet.getcurrent()
myself.has_ws_msg = False
# start monitoring websocket events
asyncio.get_event_loop().add_reader(uwsgi.connection_fd(), ws_recv_msg, myself)
# add a 4 seconds timer to manage ping/pong
asyncio.get_event_loop().call_later(4, ws_recv_msg, myself)
# add a coroutine for redis messages
f = GreenFuture()
asyncio.Task(redis_wait(subscriber, f))
# switch again
f.greenlet.parent.switch()
while True:
# any redis message in the queue ?
if f.done():
msg = f.result()
uwsgi.websocket_send("[%s] %s" % (time.time(), msg))
# restart coroutine
f = GreenFuture()
asyncio.Task(redis_wait(subscriber, f))
if myself.has_ws_msg:
myself.has_ws_msg = False
msg = uwsgi.websocket_recv_nb()
if msg:
asyncio.Task(redis_publish(connection, msg))
# switch again
f.greenlet.parent.switch()
|